summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-05 11:56:06 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-05 13:43:50 +0200
commit44173c342d698441fbbcba4685c78f9bee40d138 (patch)
tree1878eb31b3156e8ba387946795a3d2789e83b78c
parentec3b5b462c1b8296ba18a3872f56d569fa70e45b (diff)
feat(sim/core): Add Java-based simulator core
This change introduces a new class, `SimulationScheduler`, which provides the basis for simulations in OpenDC by allowing components to schedule future tasks using delay-skipping queue and a virtual clock. This new class is written in Java to remove any dependency on the Kotlin and kotlinx-coroutines runtime when not necessary.
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java246
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java265
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt106
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt231
4 files changed, 848 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
new file mode 100644
index 00000000..a70c1cda
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.concurrent.Executor;
+
+/**
+ * A scheduler is used by simulations to manage execution of (future) tasks, providing a controllable (virtual) clock to
+ * skip over delays.
+ *
+ * <p>
+ * The scheduler can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the
+ * virtual time as needed (via {@link #advanceUntilIdle}), or run the tasks that are scheduled to run as soon as
+ * possible but have not yet been dispatched (via {@link #runCurrent}). These methods execute the pending tasks using
+ * a single thread.
+ *
+ * <p>
+ * This class is not thread-safe and must not be used concurrently by multiple threads.
+ */
+public final class SimulationScheduler implements Executor {
+ /**
+ * The {@link TaskQueue} containing the pending tasks.
+ */
+ private final TaskQueue queue = new TaskQueue();
+
+ /**
+ * The current time of the scheduler in milliseconds since epoch.
+ */
+ private long currentTime;
+
+ /**
+ * A counter to establish total order on the events that happen at the same virtual time.
+ */
+ private int count = 0;
+
+ /**
+ * The {@link Clock} instance linked to this scheduler.
+ */
+ private final SimulationClock clock = new SimulationClock(this, ZoneId.systemDefault());
+
+ /**
+ * Construct a {@link SimulationScheduler} instance with the specified initial time.
+ *
+ * @param initialTimeMs The initial virtual time of the scheduler in milliseconds since epoch.
+ */
+ public SimulationScheduler(long initialTimeMs) {
+ this.currentTime = initialTimeMs;
+ }
+
+ /**
+ * Construct a {@link SimulationScheduler} instance with the initial time set to UNIX Epoch 0.
+ */
+ public SimulationScheduler() {
+ this(0);
+ }
+
+ /**
+ * Return the virtual clock associated with this dispatcher.
+ *
+ * @return A {@link Clock} tracking the virtual time of the dispatcher.
+ */
+ public Clock getClock() {
+ return clock;
+ }
+
+ /**
+ * Return the current virtual timestamp of the dispatcher (in milliseconds since epoch).
+ *
+ * @return A long value representing the virtual timestamp of the dispatcher in milliseconds since epoch.
+ */
+ public long getCurrentTime() {
+ return currentTime;
+ }
+
+ /**
+ * Schedule a <code>task</code> that executes after the specified <code>delayMs</code>.
+ *
+ * @param delayMs The time from now until the execution of the task (in milliseconds).
+ * @param task The task to execute after the delay.
+ * @return The identifier of the task that can be used together with the timestamp of the task to cancel it.
+ */
+ public int schedule(long delayMs, Runnable task) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("Attempted scheduling an event earlier in time (delay " + delayMs + " ms)");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ int id = count++;
+ queue.add(target, id, task);
+ return id;
+ }
+
+ /**
+ * Cancel a pending task.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task (returned by {@link #schedule(long, Runnable)}).
+ * @return A boolean indicating whether a task was actually cancelled.
+ */
+ public boolean cancel(long deadline, int id) {
+ return queue.remove(deadline, id);
+ }
+
+ /**
+ * Run the enqueued tasks in the specified order, advancing the virtual time as needed until there are no more
+ * tasks in the queue of this scheduler.
+ */
+ public void advanceUntilIdle() {
+ final TaskQueue queue = this.queue;
+
+ while (true) {
+ long deadline = queue.peekDeadline();
+ Runnable task = queue.poll();
+
+ if (task == null) {
+ break;
+ }
+
+ currentTime = deadline;
+ task.run();
+ }
+ }
+
+ /**
+ * Move the virtual clock of this dispatcher forward by the specified amount, running the scheduled tasks in the
+ * meantime.
+ *
+ * @param delayMs The amount of time to move the virtual clock forward (in milliseconds).
+ * @throws IllegalStateException if passed a negative <code>delay</code>.
+ */
+ public void advanceBy(long delayMs) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("Can not advance time by a negative delay: " + delayMs + " ms");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ final TaskQueue queue = this.queue;
+ long deadline;
+
+ while ((deadline = queue.peekDeadline()) < target) {
+ Runnable task = queue.poll(); // Cannot be null since while condition is always false on an empty queue
+
+ task.run();
+ currentTime = deadline;
+ }
+
+ currentTime = target;
+ }
+
+ /**
+ * Execute the tasks that are scheduled to execute at this moment of virtual time.
+ */
+ public void runCurrent() {
+ final TaskQueue queue = this.queue;
+ long currentTime = this.currentTime;
+
+ while (queue.peekDeadline() == currentTime) {
+ Runnable task = queue.poll();
+
+ if (task == null) {
+ break;
+ }
+
+ task.run();
+ }
+ }
+
+ /**
+ * Schedule the specified command to run at this moment of virtual time.
+ *
+ * @param command The command to execute.
+ */
+ @Override
+ public void execute(Runnable command) {
+ schedule(0, command);
+ }
+
+ /**
+ * A {@link Clock} implementation for a {@link SimulationScheduler}.
+ */
+ private static class SimulationClock extends Clock {
+ private final SimulationScheduler scheduler;
+ private final ZoneId zone;
+
+ SimulationClock(SimulationScheduler scheduler, ZoneId zone) {
+ this.scheduler = scheduler;
+ this.zone = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zone;
+ }
+
+ @Override
+ public Clock withZone(ZoneId zoneId) {
+ return new SimulationClock(scheduler, zone);
+ }
+
+ @Override
+ public Instant instant() {
+ return Instant.ofEpochMilli(scheduler.currentTime);
+ }
+
+ @Override
+ public long millis() {
+ return scheduler.currentTime;
+ }
+
+ @Override
+ public String toString() {
+ return "SimulationClock[time=" + millis() + "ms]";
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java
new file mode 100644
index 00000000..7d867b5d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator;
+
+import java.util.Arrays;
+
+/**
+ * Specialized priority queue for pending tasks.
+ *
+ * <p>
+ * This class uses a specialized priority queue (as opposed to a generic {@link java.util.PriorityQueue}), which reduces
+ * unnecessary allocations in the simulator's hot path.
+ */
+final class TaskQueue {
+ /**
+ * The deadlines of the pending tasks.
+ */
+ private long[] deadlines;
+
+ /**
+ * The identifiers of the pending tasks. Identifiers are used to provide a total order for pending tasks in case
+ * the deadline of two tasks is the same.
+ */
+ private int[] ids;
+
+ /**
+ * The {@link Runnable}s representing the tasks that have been scheduled.
+ */
+ private Runnable[] tasks;
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private int size = 0;
+
+ /**
+ * Construct a {@link TaskQueue} with the specified initial capacity.
+ *
+ * @param initialCapacity The initial capacity of the queue.
+ */
+ public TaskQueue(int initialCapacity) {
+ this.deadlines = new long[initialCapacity];
+ this.ids = new int[initialCapacity];
+ this.tasks = new Runnable[initialCapacity];
+ }
+
+ /**
+ * Construct a {@link TaskQueue} with an initial capacity of 256 elements.
+ */
+ public TaskQueue() {
+ this(256);
+ }
+
+ /**
+ * Add a new task to this queue.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task.
+ * @param task The {@link Runnable} representing the task to execute.
+ */
+ public void add(long deadline, int id, Runnable task) {
+ int i = size;
+ long[] deadlines = this.deadlines;
+
+ if (i >= deadlines.length) {
+ grow();
+
+ // Re-fetch the resized array
+ deadlines = this.deadlines;
+ }
+
+ siftUp(deadlines, ids, tasks, i, deadline, id, task);
+
+ size = i + 1;
+ }
+
+ /**
+ * Retrieve the next task to be executed.
+ *
+ * @return The head of the queue or <code>null</code> if the queue is empty.
+ */
+ public Runnable poll() {
+ final Runnable[] tasks = this.tasks;
+ final Runnable result = tasks[0];
+
+ if (result != null) {
+ int n = --size;
+
+ if (n > 0) {
+ long[] deadlines = this.deadlines;
+ int[] ids = this.ids;
+
+ siftDown(deadlines, ids, tasks, 0, n, deadlines[n], ids[n], tasks[n]);
+ }
+
+ // Clear the last element of the queue
+ tasks[n] = null;
+ }
+
+ return result;
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ *
+ * @return The earliest deadline in the queue or {@link Long#MAX_VALUE} if the queue is empty.
+ */
+ public long peekDeadline() {
+ if (size == 0) {
+ return Long.MAX_VALUE;
+ }
+
+ return deadlines[0];
+ }
+
+ /**
+ * Remove the timer entry with the specified <code>deadline</code> and <code>id</code>.
+ */
+ public boolean remove(long deadline, int id) {
+ long[] deadlines = this.deadlines;
+ int[] ids = this.ids;
+
+ int size = this.size;
+ int i = -1;
+
+ for (int j = 0; j < size; j++) {
+ if (deadlines[j] == deadline && ids[j] == id) {
+ i = j;
+ break;
+ }
+ }
+
+ if (i < 0) {
+ return false;
+ }
+
+ Runnable[] tasks = this.tasks;
+ int s = size - 1;
+ this.size = s;
+
+ if (s == i) {
+ tasks[i] = null;
+ } else {
+ long movedDeadline = deadlines[s];
+ int movedId = ids[s];
+ Runnable movedTask = tasks[s];
+
+ tasks[s] = null;
+
+ siftDown(deadlines, ids, tasks, i, s, movedDeadline, movedId, movedTask);
+ if (tasks[i] == movedTask) {
+ siftUp(deadlines, ids, tasks, i, movedDeadline, movedId, movedTask);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Increases the capacity of the priority queue.
+ */
+ private void grow() {
+ int oldCapacity = deadlines.length;
+
+ // Double size if small; else grow by 50%
+ int newCapacity = oldCapacity + oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1;
+
+ deadlines = Arrays.copyOf(deadlines, newCapacity);
+ ids = Arrays.copyOf(ids, newCapacity);
+ tasks = Arrays.copyOf(tasks, newCapacity);
+ }
+
+ /**
+ * Sift up an entry in the heap.
+ */
+ private static void siftUp(long[] deadlines, int[] ids, Runnable[] tasks, int k, long deadline, int id, Runnable task) {
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ long parentDeadline = deadlines[parent];
+ int parentId = ids[parent];
+
+ if (compare(deadline, id, parentDeadline, parentId) >= 0) {
+ break;
+ }
+
+ deadlines[k] = parentDeadline;
+ ids[k] = parentId;
+ tasks[k] = tasks[parent];
+
+ k = parent;
+ }
+
+ deadlines[k] = deadline;
+ ids[k] = id;
+ tasks[k] = task;
+ }
+
+ /**
+ * Sift down an entry in the heap.
+ */
+ private static void siftDown(long[] deadlines, int[] ids, Runnable[] tasks, int k, int n, long deadline, int id, Runnable task) {
+ int half = n >>> 1; // loop while a non-leaf
+
+ while (k < half) {
+ int child = (k << 1) + 1; // assume left child is least
+
+ long childDeadline = deadlines[child];
+ int childId = ids[child];
+
+ int right = child + 1;
+ if (right < n) {
+ long rightDeadline = deadlines[right];
+ int rightId = ids[right];
+
+ if (compare(childDeadline, childId, rightDeadline, rightId) > 0) {
+ child = right;
+ childDeadline = rightDeadline;
+ childId = rightId;
+ }
+ }
+
+ if (compare(deadline, id, childDeadline, childId) <= 0) {
+ break;
+ }
+
+ deadlines[k] = childDeadline;
+ ids[k] = childId;
+ tasks[k] = tasks[child];
+
+ k = child;
+ }
+
+ deadlines[k] = deadline;
+ ids[k] = id;
+ tasks[k] = task;
+ }
+
+ /**
+ * Helper method to compare two task entries.
+ */
+ private static int compare(long leftDeadline, int leftId, long rightDeadline, int rightId) {
+ int cmp = Long.compare(leftDeadline, rightDeadline);
+ return cmp == 0 ? Integer.compare(leftId, rightId) : cmp;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt
new file mode 100644
index 00000000..eca3b582
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import java.time.Instant
+
+/**
+ * Test suite for the [SimulationScheduler] class.
+ */
+class SimulationSchedulerTest {
+ /**
+ * Test the basic functionality of [SimulationScheduler.runCurrent].
+ */
+ @Test
+ fun testRunCurrent() {
+ val scheduler = SimulationScheduler()
+ var count = 0
+
+ scheduler.schedule(1) { count += 1 }
+ scheduler.schedule(2) { count += 1 }
+
+ scheduler.advanceBy(1)
+ assertEquals(0, count)
+ scheduler.runCurrent()
+ assertEquals(1, count)
+ scheduler.advanceBy(1)
+ assertEquals(1, count)
+ scheduler.runCurrent()
+ assertEquals(2, count)
+ assertEquals(2, scheduler.currentTime)
+
+ scheduler.advanceBy(Long.MAX_VALUE)
+ scheduler.runCurrent()
+ assertEquals(Long.MAX_VALUE, scheduler.currentTime)
+ }
+
+ /**
+ * Test the clock of the [SimulationScheduler].
+ */
+ @Test
+ fun testClock() {
+ val scheduler = SimulationScheduler()
+ var count = 0
+
+ scheduler.schedule(1) { count += 1 }
+ scheduler.schedule(2) { count += 1 }
+
+ scheduler.advanceBy(2)
+ assertEquals(2, scheduler.currentTime)
+ assertEquals(2, scheduler.clock.millis())
+ assertEquals(Instant.ofEpochMilli(2), scheduler.clock.instant())
+ }
+
+ /**
+ * Test large delays.
+ */
+ @Test
+ fun testAdvanceByLargeDelays() {
+ val scheduler = SimulationScheduler()
+ var count = 0
+
+ scheduler.schedule(1) { count += 1 }
+
+ scheduler.advanceBy(10)
+
+ scheduler.schedule(Long.MAX_VALUE) { count += 1 }
+ scheduler.schedule(100_000_000) { count += 1 }
+
+ scheduler.advanceUntilIdle()
+ assertEquals(3, count)
+ }
+
+ /**
+ * Test negative delays.
+ */
+ @Test
+ fun testNegativeDelays() {
+ val scheduler = SimulationScheduler()
+
+ assertThrows<IllegalArgumentException> { scheduler.schedule(-100) { } }
+ assertThrows<IllegalArgumentException> { scheduler.advanceBy(-100) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt
new file mode 100644
index 00000000..a4d779cb
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt
@@ -0,0 +1,231 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [TaskQueue] class.
+ */
+class TaskQueueTest {
+ private lateinit var queue: TaskQueue
+
+ @BeforeEach
+ fun setUp() {
+ queue = TaskQueue(3)
+ }
+
+ /**
+ * Test whether a call to [TaskQueue.poll] returns `null` for an empty queue.
+ */
+ @Test
+ fun testPollEmpty() {
+ assertAll(
+ { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test whether a call to [TaskQueue.poll] returns the proper value for a queue with a single entry.
+ */
+ @Test
+ fun testSingleEntry() {
+ val entry = Runnable {}
+
+ queue.add(100, 1, entry)
+
+ assertAll(
+ { assertEquals(100, queue.peekDeadline()) },
+ { assertEquals(entry, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test whether [TaskQueue.poll] returns values in the queue in the proper order.
+ */
+ @Test
+ fun testMultipleEntries() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(48, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ assertAll(
+ { assertEquals(48, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test whether [TaskQueue.poll] returns values in the queue in the proper order with duplicates.
+ */
+ @Test
+ fun testMultipleEntriesDuplicate() {
+ val entryA = Runnable {}
+ queue.add(48, 0, entryA)
+
+ val entryB = Runnable {}
+ queue.add(48, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(48, 2, entryC)
+
+ assertAll(
+ { assertEquals(48, queue.peekDeadline()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that the queue is properly resized when the number of entries exceed the capacity.
+ */
+ @Test
+ fun testResize() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ val entryD = Runnable {}
+ queue.add(38, 1, entryD)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryD, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the end of the queue.
+ */
+ @Test
+ fun testRemoveEntryTail() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ queue.remove(100, 1)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the head of the queue.
+ */
+ @Test
+ fun testRemoveEntryHead() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ queue.remove(20, 1)
+
+ assertAll(
+ { assertEquals(58, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the middle of a queue.
+ */
+ @Test
+ fun testRemoveEntryMiddle() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ queue.remove(58, 1)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can "remove" an unknown entry without error.
+ */
+ @Test
+ fun testRemoveUnknown() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ assertAll(
+ { assertFalse(queue.remove(10, 1)) },
+ { assertFalse(queue.remove(58, 2)) }
+ )
+ }
+}