summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-core/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-05 14:44:43 +0200
committerGitHub <noreply@github.com>2022-10-05 14:44:43 +0200
commitc2047d09b27b0c05f5c203509dde524e17d3b729 (patch)
tree3903d8aed5e87850c92e1b2dce8379ea99bdfa6d /opendc-simulator/opendc-simulator-core/src
parentec3b5b462c1b8296ba18a3872f56d569fa70e45b (diff)
parentbe176910eb870209576326ffaad8bf21241fccbd (diff)
merge: Extract scheduler from simulation coroutine dispatcher (#106)
This pull request extracts the scheduler from the `SimulationCoroutineDispatcher` into a separate `SimulationScheduler` class which allows users to re-use the scheduler between different coroutine dispatchers. We implement the `SimulationScheduler` in Java, removing the explicit dependency on Kotlin or `kotlinx-coroutines`. The scheduler uses a separate specialized priority queue implementation that eliminates allocation in the hot path of the simulator. ## Implementation Notes :hammer_and_pick: * Add Java-based simulator core * Use SimulationScheduler in coroutine dispatcher * Rename runBlockingSimulation to runSimulation ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * The Kotlin API for simulation has been moved to `org.opendc.simulator.kotlin`. * `runBlockingSImulation` renamed to `runSimulation`
Diffstat (limited to 'opendc-simulator/opendc-simulator-core/src')
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java246
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java265
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt167
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt)51
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt)8
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt94
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt)12
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt106
-rw-r--r--opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt231
9 files changed, 996 insertions, 184 deletions
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
new file mode 100644
index 00000000..a70c1cda
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.concurrent.Executor;
+
+/**
+ * A scheduler is used by simulations to manage execution of (future) tasks, providing a controllable (virtual) clock to
+ * skip over delays.
+ *
+ * <p>
+ * The scheduler can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the
+ * virtual time as needed (via {@link #advanceUntilIdle}), or run the tasks that are scheduled to run as soon as
+ * possible but have not yet been dispatched (via {@link #runCurrent}). These methods execute the pending tasks using
+ * a single thread.
+ *
+ * <p>
+ * This class is not thread-safe and must not be used concurrently by multiple threads.
+ */
+public final class SimulationScheduler implements Executor {
+ /**
+ * The {@link TaskQueue} containing the pending tasks.
+ */
+ private final TaskQueue queue = new TaskQueue();
+
+ /**
+ * The current time of the scheduler in milliseconds since epoch.
+ */
+ private long currentTime;
+
+ /**
+ * A counter to establish total order on the events that happen at the same virtual time.
+ */
+ private int count = 0;
+
+ /**
+ * The {@link Clock} instance linked to this scheduler.
+ */
+ private final SimulationClock clock = new SimulationClock(this, ZoneId.systemDefault());
+
+ /**
+ * Construct a {@link SimulationScheduler} instance with the specified initial time.
+ *
+ * @param initialTimeMs The initial virtual time of the scheduler in milliseconds since epoch.
+ */
+ public SimulationScheduler(long initialTimeMs) {
+ this.currentTime = initialTimeMs;
+ }
+
+ /**
+ * Construct a {@link SimulationScheduler} instance with the initial time set to UNIX Epoch 0.
+ */
+ public SimulationScheduler() {
+ this(0);
+ }
+
+ /**
+ * Return the virtual clock associated with this dispatcher.
+ *
+ * @return A {@link Clock} tracking the virtual time of the dispatcher.
+ */
+ public Clock getClock() {
+ return clock;
+ }
+
+ /**
+ * Return the current virtual timestamp of the dispatcher (in milliseconds since epoch).
+ *
+ * @return A long value representing the virtual timestamp of the dispatcher in milliseconds since epoch.
+ */
+ public long getCurrentTime() {
+ return currentTime;
+ }
+
+ /**
+ * Schedule a <code>task</code> that executes after the specified <code>delayMs</code>.
+ *
+ * @param delayMs The time from now until the execution of the task (in milliseconds).
+ * @param task The task to execute after the delay.
+ * @return The identifier of the task that can be used together with the timestamp of the task to cancel it.
+ */
+ public int schedule(long delayMs, Runnable task) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("Attempted scheduling an event earlier in time (delay " + delayMs + " ms)");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ int id = count++;
+ queue.add(target, id, task);
+ return id;
+ }
+
+ /**
+ * Cancel a pending task.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task (returned by {@link #schedule(long, Runnable)}).
+ * @return A boolean indicating whether a task was actually cancelled.
+ */
+ public boolean cancel(long deadline, int id) {
+ return queue.remove(deadline, id);
+ }
+
+ /**
+ * Run the enqueued tasks in the specified order, advancing the virtual time as needed until there are no more
+ * tasks in the queue of this scheduler.
+ */
+ public void advanceUntilIdle() {
+ final TaskQueue queue = this.queue;
+
+ while (true) {
+ long deadline = queue.peekDeadline();
+ Runnable task = queue.poll();
+
+ if (task == null) {
+ break;
+ }
+
+ currentTime = deadline;
+ task.run();
+ }
+ }
+
+ /**
+ * Move the virtual clock of this dispatcher forward by the specified amount, running the scheduled tasks in the
+ * meantime.
+ *
+ * @param delayMs The amount of time to move the virtual clock forward (in milliseconds).
+ * @throws IllegalStateException if passed a negative <code>delay</code>.
+ */
+ public void advanceBy(long delayMs) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("Can not advance time by a negative delay: " + delayMs + " ms");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ final TaskQueue queue = this.queue;
+ long deadline;
+
+ while ((deadline = queue.peekDeadline()) < target) {
+ Runnable task = queue.poll(); // Cannot be null since while condition is always false on an empty queue
+
+ task.run();
+ currentTime = deadline;
+ }
+
+ currentTime = target;
+ }
+
+ /**
+ * Execute the tasks that are scheduled to execute at this moment of virtual time.
+ */
+ public void runCurrent() {
+ final TaskQueue queue = this.queue;
+ long currentTime = this.currentTime;
+
+ while (queue.peekDeadline() == currentTime) {
+ Runnable task = queue.poll();
+
+ if (task == null) {
+ break;
+ }
+
+ task.run();
+ }
+ }
+
+ /**
+ * Schedule the specified command to run at this moment of virtual time.
+ *
+ * @param command The command to execute.
+ */
+ @Override
+ public void execute(Runnable command) {
+ schedule(0, command);
+ }
+
+ /**
+ * A {@link Clock} implementation for a {@link SimulationScheduler}.
+ */
+ private static class SimulationClock extends Clock {
+ private final SimulationScheduler scheduler;
+ private final ZoneId zone;
+
+ SimulationClock(SimulationScheduler scheduler, ZoneId zone) {
+ this.scheduler = scheduler;
+ this.zone = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zone;
+ }
+
+ @Override
+ public Clock withZone(ZoneId zoneId) {
+ return new SimulationClock(scheduler, zone);
+ }
+
+ @Override
+ public Instant instant() {
+ return Instant.ofEpochMilli(scheduler.currentTime);
+ }
+
+ @Override
+ public long millis() {
+ return scheduler.currentTime;
+ }
+
+ @Override
+ public String toString() {
+ return "SimulationClock[time=" + millis() + "ms]";
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java
new file mode 100644
index 00000000..7d867b5d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator;
+
+import java.util.Arrays;
+
+/**
+ * Specialized priority queue for pending tasks.
+ *
+ * <p>
+ * This class uses a specialized priority queue (as opposed to a generic {@link java.util.PriorityQueue}), which reduces
+ * unnecessary allocations in the simulator's hot path.
+ */
+final class TaskQueue {
+ /**
+ * The deadlines of the pending tasks.
+ */
+ private long[] deadlines;
+
+ /**
+ * The identifiers of the pending tasks. Identifiers are used to provide a total order for pending tasks in case
+ * the deadline of two tasks is the same.
+ */
+ private int[] ids;
+
+ /**
+ * The {@link Runnable}s representing the tasks that have been scheduled.
+ */
+ private Runnable[] tasks;
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private int size = 0;
+
+ /**
+ * Construct a {@link TaskQueue} with the specified initial capacity.
+ *
+ * @param initialCapacity The initial capacity of the queue.
+ */
+ public TaskQueue(int initialCapacity) {
+ this.deadlines = new long[initialCapacity];
+ this.ids = new int[initialCapacity];
+ this.tasks = new Runnable[initialCapacity];
+ }
+
+ /**
+ * Construct a {@link TaskQueue} with an initial capacity of 256 elements.
+ */
+ public TaskQueue() {
+ this(256);
+ }
+
+ /**
+ * Add a new task to this queue.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task.
+ * @param task The {@link Runnable} representing the task to execute.
+ */
+ public void add(long deadline, int id, Runnable task) {
+ int i = size;
+ long[] deadlines = this.deadlines;
+
+ if (i >= deadlines.length) {
+ grow();
+
+ // Re-fetch the resized array
+ deadlines = this.deadlines;
+ }
+
+ siftUp(deadlines, ids, tasks, i, deadline, id, task);
+
+ size = i + 1;
+ }
+
+ /**
+ * Retrieve the next task to be executed.
+ *
+ * @return The head of the queue or <code>null</code> if the queue is empty.
+ */
+ public Runnable poll() {
+ final Runnable[] tasks = this.tasks;
+ final Runnable result = tasks[0];
+
+ if (result != null) {
+ int n = --size;
+
+ if (n > 0) {
+ long[] deadlines = this.deadlines;
+ int[] ids = this.ids;
+
+ siftDown(deadlines, ids, tasks, 0, n, deadlines[n], ids[n], tasks[n]);
+ }
+
+ // Clear the last element of the queue
+ tasks[n] = null;
+ }
+
+ return result;
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ *
+ * @return The earliest deadline in the queue or {@link Long#MAX_VALUE} if the queue is empty.
+ */
+ public long peekDeadline() {
+ if (size == 0) {
+ return Long.MAX_VALUE;
+ }
+
+ return deadlines[0];
+ }
+
+ /**
+ * Remove the timer entry with the specified <code>deadline</code> and <code>id</code>.
+ */
+ public boolean remove(long deadline, int id) {
+ long[] deadlines = this.deadlines;
+ int[] ids = this.ids;
+
+ int size = this.size;
+ int i = -1;
+
+ for (int j = 0; j < size; j++) {
+ if (deadlines[j] == deadline && ids[j] == id) {
+ i = j;
+ break;
+ }
+ }
+
+ if (i < 0) {
+ return false;
+ }
+
+ Runnable[] tasks = this.tasks;
+ int s = size - 1;
+ this.size = s;
+
+ if (s == i) {
+ tasks[i] = null;
+ } else {
+ long movedDeadline = deadlines[s];
+ int movedId = ids[s];
+ Runnable movedTask = tasks[s];
+
+ tasks[s] = null;
+
+ siftDown(deadlines, ids, tasks, i, s, movedDeadline, movedId, movedTask);
+ if (tasks[i] == movedTask) {
+ siftUp(deadlines, ids, tasks, i, movedDeadline, movedId, movedTask);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Increases the capacity of the priority queue.
+ */
+ private void grow() {
+ int oldCapacity = deadlines.length;
+
+ // Double size if small; else grow by 50%
+ int newCapacity = oldCapacity + oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1;
+
+ deadlines = Arrays.copyOf(deadlines, newCapacity);
+ ids = Arrays.copyOf(ids, newCapacity);
+ tasks = Arrays.copyOf(tasks, newCapacity);
+ }
+
+ /**
+ * Sift up an entry in the heap.
+ */
+ private static void siftUp(long[] deadlines, int[] ids, Runnable[] tasks, int k, long deadline, int id, Runnable task) {
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ long parentDeadline = deadlines[parent];
+ int parentId = ids[parent];
+
+ if (compare(deadline, id, parentDeadline, parentId) >= 0) {
+ break;
+ }
+
+ deadlines[k] = parentDeadline;
+ ids[k] = parentId;
+ tasks[k] = tasks[parent];
+
+ k = parent;
+ }
+
+ deadlines[k] = deadline;
+ ids[k] = id;
+ tasks[k] = task;
+ }
+
+ /**
+ * Sift down an entry in the heap.
+ */
+ private static void siftDown(long[] deadlines, int[] ids, Runnable[] tasks, int k, int n, long deadline, int id, Runnable task) {
+ int half = n >>> 1; // loop while a non-leaf
+
+ while (k < half) {
+ int child = (k << 1) + 1; // assume left child is least
+
+ long childDeadline = deadlines[child];
+ int childId = ids[child];
+
+ int right = child + 1;
+ if (right < n) {
+ long rightDeadline = deadlines[right];
+ int rightId = ids[right];
+
+ if (compare(childDeadline, childId, rightDeadline, rightId) > 0) {
+ child = right;
+ childDeadline = rightDeadline;
+ childId = rightId;
+ }
+ }
+
+ if (compare(deadline, id, childDeadline, childId) <= 0) {
+ break;
+ }
+
+ deadlines[k] = childDeadline;
+ ids[k] = childId;
+ tasks[k] = tasks[child];
+
+ k = child;
+ }
+
+ deadlines[k] = deadline;
+ ids[k] = id;
+ tasks[k] = task;
+ }
+
+ /**
+ * Helper method to compare two task entries.
+ */
+ private static int compare(long leftDeadline, int leftId, long rightDeadline, int rightId) {
+ int cmp = Long.compare(leftDeadline, rightDeadline);
+ return cmp == 0 ? Integer.compare(leftId, rightId) : cmp;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt
deleted file mode 100644
index 908e902a..00000000
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.core
-
-import kotlinx.coroutines.*
-import java.lang.Runnable
-import java.time.Clock
-import java.time.Instant
-import java.time.ZoneId
-import java.util.*
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual
- * clock for time management.
- */
-@OptIn(InternalCoroutinesApi::class)
-public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationController, Delay {
- /**
- * Queue of ordered tasks to run.
- */
- private val queue = PriorityQueue<TimedRunnable>()
-
- /**
- * Global order counter.
- */
- private var _counter = 0L
-
- /**
- * The current virtual time of simulation
- */
- private var _clock = SimClock()
-
- /**
- * The virtual clock of this dispatcher.
- */
- override val clock: Clock = ClockAdapter(_clock)
-
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- block.run()
- }
-
- override fun dispatchYield(context: CoroutineContext, block: Runnable) {
- post(block)
- }
-
- @OptIn(ExperimentalCoroutinesApi::class)
- override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- postDelayed(CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) }, timeMillis)
- }
-
- override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
- val node = postDelayed(block, timeMillis)
- return object : DisposableHandle {
- override fun dispose() {
- queue.remove(node)
- }
- }
- }
-
- override fun toString(): String {
- return "SimulationCoroutineDispatcher[time=${_clock.time}ms, queued=${queue.size}]"
- }
-
- private fun post(block: Runnable) =
- queue.add(TimedRunnable(block, _counter++))
-
- private fun postDelayed(block: Runnable, delayTime: Long) =
- TimedRunnable(block, _counter++, safePlus(_clock.time, delayTime))
- .also {
- queue.add(it)
- }
-
- private fun safePlus(currentTime: Long, delayTime: Long): Long {
- check(delayTime >= 0)
- val result = currentTime + delayTime
- if (result < currentTime) return Long.MAX_VALUE // clamp on overflow
- return result
- }
-
- override fun advanceUntilIdle(): Long {
- val queue = queue
- val clock = _clock
- val oldTime = clock.time
-
- while (true) {
- val current = queue.poll() ?: break
-
- // If the scheduled time is 0 (immediate) use current virtual time
- if (current.time != 0L) {
- clock.time = current.time
- }
-
- current.run()
- }
-
- return clock.time - oldTime
- }
-
- /**
- * A helper class that holds the time of the simulation.
- */
- private class SimClock(@JvmField var time: Long = 0)
-
- /**
- * A helper class to expose a [Clock] instance for this dispatcher.
- */
- private class ClockAdapter(private val clock: SimClock, private val zone: ZoneId = ZoneId.systemDefault()) : Clock() {
- override fun getZone(): ZoneId = zone
-
- override fun withZone(zone: ZoneId): Clock = ClockAdapter(clock, zone)
-
- override fun instant(): Instant = Instant.ofEpochMilli(millis())
-
- override fun millis(): Long = clock.time
-
- override fun toString(): String = "SimulationCoroutineDispatcher.ClockAdapter[time=${clock.time}]"
- }
-
- /**
- * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled
- * in the future.
- */
- private class CancellableContinuationRunnable<T>(
- @JvmField val continuation: CancellableContinuation<T>,
- private val block: CancellableContinuation<T>.() -> Unit
- ) : Runnable {
- override fun run() = continuation.block()
- }
-
- /**
- * A Runnable for our event loop that represents a task to perform at a time.
- */
- private class TimedRunnable(
- @JvmField val runnable: Runnable,
- private val count: Long = 0,
- @JvmField val time: Long = 0
- ) : Comparable<TimedRunnable>, Runnable by runnable {
- override fun compareTo(other: TimedRunnable) = if (time == other.time) {
- count.compareTo(other.count)
- } else {
- time.compareTo(other.time)
- }
-
- override fun toString() = "TimedRunnable[time=$time, run=$runnable]"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
index 9b284c11..c4cc0171 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt
@@ -20,19 +20,46 @@
* SOFTWARE.
*/
-package org.opendc.simulator.core
+package org.opendc.simulator.kotlin
import kotlinx.coroutines.*
+import org.opendc.simulator.SimulationScheduler
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
- * Executes a [body] inside an immediate execution dispatcher.
+ * Executes [body] as a simulation in a new coroutine.
+ *
+ * This function behaves similarly to [runBlocking], with the difference that the code that it runs will skip delays.
+ * This allows to use [delay] in without causing the simulation to take more time than necessary.
+ *
+ * ```
+ * @Test
+ * fun exampleSimulation() = runSimulation {
+ * val deferred = async {
+ * delay(1_000)
+ * async {
+ * delay(1_000)
+ * }.await()
+ * }
+ *
+ * deferred.await() // result available immediately
+ * }
+ * ```
+ *
+ * The simulation is run in a single thread, unless other [CoroutineDispatcher] are used for child coroutines.
+ * Because of this, child coroutines are not executed in parallel to [body].
+ * In order for the spawned-off asynchronous code to actually be executed, one must either [yield] or suspend the
+ * body some other way, or use commands that control scheduling (see [SimulationScheduler]).
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public fun runBlockingSimulation(context: CoroutineContext = EmptyCoroutineContext, body: suspend SimulationCoroutineScope.() -> Unit) {
- val (safeContext, dispatcher) = context.checkArguments()
+public fun runSimulation(
+ context: CoroutineContext = EmptyCoroutineContext,
+ scheduler: SimulationScheduler = SimulationScheduler(),
+ body: suspend SimulationCoroutineScope.() -> Unit
+) {
+ val (safeContext, dispatcher) = context.checkArguments(scheduler)
val startingJobs = safeContext.activeJobs()
val scope = SimulationCoroutineScope(safeContext)
val deferred = scope.async {
@@ -49,21 +76,21 @@ public fun runBlockingSimulation(context: CoroutineContext = EmptyCoroutineConte
}
/**
- * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineScope].
+ * Convenience method for calling [runSimulation] on an existing [SimulationCoroutineScope].
*/
-public fun SimulationCoroutineScope.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
- runBlockingSimulation(coroutineContext, block)
+public fun SimulationCoroutineScope.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
+ runSimulation(coroutineContext, scheduler, block)
/**
- * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineDispatcher].
+ * Convenience method for calling [runSimulation] on an existing [SimulationCoroutineDispatcher].
*/
-public fun SimulationCoroutineDispatcher.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
- runBlockingSimulation(this, block)
+public fun SimulationCoroutineDispatcher.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit =
+ runSimulation(this, scheduler, block)
-private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, SimulationController> {
+private fun CoroutineContext.checkArguments(scheduler: SimulationScheduler): Pair<CoroutineContext, SimulationController> {
val dispatcher = get(ContinuationInterceptor).run {
this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } }
- this ?: SimulationCoroutineDispatcher()
+ this ?: SimulationCoroutineDispatcher(scheduler)
}
val job = get(Job) ?: SupervisorJob()
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
index 2b670b91..f96b2326 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt
@@ -20,9 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.simulator.core
+package org.opendc.simulator.kotlin
import kotlinx.coroutines.CoroutineDispatcher
+import org.opendc.simulator.SimulationScheduler
import java.time.Clock
/**
@@ -35,6 +36,11 @@ public interface SimulationController {
public val clock: Clock
/**
+ * The [SimulationScheduler] driving the simulation.
+ */
+ public val scheduler: SimulationScheduler
+
+ /**
* Immediately execute all pending tasks and advance the virtual clock-time to the last delay.
*
* If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle`
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt
new file mode 100644
index 00000000..21ad1a86
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.kotlin
+
+import kotlinx.coroutines.*
+import org.opendc.simulator.SimulationScheduler
+import java.lang.Runnable
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual
+ * clock for time management.
+ *
+ * @param scheduler The [SimulationScheduler] used to manage the execution of future tasks.
+ */
+@OptIn(InternalCoroutinesApi::class)
+public class SimulationCoroutineDispatcher(
+ override val scheduler: SimulationScheduler = SimulationScheduler()
+) : CoroutineDispatcher(), SimulationController, Delay {
+ /**
+ * The virtual clock of this dispatcher.
+ */
+ override val clock: Clock = scheduler.clock
+
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ block.run()
+ }
+
+ override fun dispatchYield(context: CoroutineContext, block: Runnable) {
+ scheduler.execute(block)
+ }
+
+ @OptIn(ExperimentalCoroutinesApi::class)
+ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+ scheduler.schedule(timeMillis, CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) })
+ }
+
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
+ return object : DisposableHandle {
+ private val deadline = (scheduler.currentTime + timeMillis).let { if (it >= 0) it else Long.MAX_VALUE }
+ private val id = scheduler.schedule(timeMillis, block)
+
+ override fun dispose() {
+ scheduler.cancel(deadline, id)
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "SimulationCoroutineDispatcher[time=${scheduler.currentTime}ms]"
+ }
+
+ override fun advanceUntilIdle(): Long {
+ val scheduler = scheduler
+ val oldTime = scheduler.currentTime
+
+ scheduler.advanceUntilIdle()
+
+ return scheduler.currentTime - oldTime
+ }
+
+ /**
+ * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled
+ * in the future.
+ */
+ private class CancellableContinuationRunnable<T>(
+ @JvmField val continuation: CancellableContinuation<T>,
+ private val block: CancellableContinuation<T>.() -> Unit
+ ) : Runnable {
+ override fun run() = continuation.block()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
index 1da7f0fa..6be8e67a 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt
@@ -20,16 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.simulator.core
+package org.opendc.simulator.kotlin
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
+import org.opendc.simulator.SimulationScheduler
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
- * A scope which provides detailed control over the execution of coroutines for simulations.
+ * A scope which provides detailed control over the execution of coroutines for simulations.
*/
public interface SimulationCoroutineScope : CoroutineScope, SimulationController
@@ -46,9 +47,12 @@ private class SimulationCoroutineScopeImpl(
* scope adds [SimulationCoroutineDispatcher] automatically.
*/
@Suppress("FunctionName")
-public fun SimulationCoroutineScope(context: CoroutineContext = EmptyCoroutineContext): SimulationCoroutineScope {
+public fun SimulationCoroutineScope(
+ context: CoroutineContext = EmptyCoroutineContext,
+ scheduler: SimulationScheduler = SimulationScheduler()
+): SimulationCoroutineScope {
var safeContext = context
- if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher()
+ if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher(scheduler)
return SimulationCoroutineScopeImpl(safeContext)
}
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt
new file mode 100644
index 00000000..eca3b582
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import java.time.Instant
+
+/**
+ * Test suite for the [SimulationScheduler] class.
+ */
+class SimulationSchedulerTest {
+ /**
+ * Test the basic functionality of [SimulationScheduler.runCurrent].
+ */
+ @Test
+ fun testRunCurrent() {
+ val scheduler = SimulationScheduler()
+ var count = 0
+
+ scheduler.schedule(1) { count += 1 }
+ scheduler.schedule(2) { count += 1 }
+
+ scheduler.advanceBy(1)
+ assertEquals(0, count)
+ scheduler.runCurrent()
+ assertEquals(1, count)
+ scheduler.advanceBy(1)
+ assertEquals(1, count)
+ scheduler.runCurrent()
+ assertEquals(2, count)
+ assertEquals(2, scheduler.currentTime)
+
+ scheduler.advanceBy(Long.MAX_VALUE)
+ scheduler.runCurrent()
+ assertEquals(Long.MAX_VALUE, scheduler.currentTime)
+ }
+
+ /**
+ * Test the clock of the [SimulationScheduler].
+ */
+ @Test
+ fun testClock() {
+ val scheduler = SimulationScheduler()
+ var count = 0
+
+ scheduler.schedule(1) { count += 1 }
+ scheduler.schedule(2) { count += 1 }
+
+ scheduler.advanceBy(2)
+ assertEquals(2, scheduler.currentTime)
+ assertEquals(2, scheduler.clock.millis())
+ assertEquals(Instant.ofEpochMilli(2), scheduler.clock.instant())
+ }
+
+ /**
+ * Test large delays.
+ */
+ @Test
+ fun testAdvanceByLargeDelays() {
+ val scheduler = SimulationScheduler()
+ var count = 0
+
+ scheduler.schedule(1) { count += 1 }
+
+ scheduler.advanceBy(10)
+
+ scheduler.schedule(Long.MAX_VALUE) { count += 1 }
+ scheduler.schedule(100_000_000) { count += 1 }
+
+ scheduler.advanceUntilIdle()
+ assertEquals(3, count)
+ }
+
+ /**
+ * Test negative delays.
+ */
+ @Test
+ fun testNegativeDelays() {
+ val scheduler = SimulationScheduler()
+
+ assertThrows<IllegalArgumentException> { scheduler.schedule(-100) { } }
+ assertThrows<IllegalArgumentException> { scheduler.advanceBy(-100) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt
new file mode 100644
index 00000000..a4d779cb
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt
@@ -0,0 +1,231 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [TaskQueue] class.
+ */
+class TaskQueueTest {
+ private lateinit var queue: TaskQueue
+
+ @BeforeEach
+ fun setUp() {
+ queue = TaskQueue(3)
+ }
+
+ /**
+ * Test whether a call to [TaskQueue.poll] returns `null` for an empty queue.
+ */
+ @Test
+ fun testPollEmpty() {
+ assertAll(
+ { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test whether a call to [TaskQueue.poll] returns the proper value for a queue with a single entry.
+ */
+ @Test
+ fun testSingleEntry() {
+ val entry = Runnable {}
+
+ queue.add(100, 1, entry)
+
+ assertAll(
+ { assertEquals(100, queue.peekDeadline()) },
+ { assertEquals(entry, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test whether [TaskQueue.poll] returns values in the queue in the proper order.
+ */
+ @Test
+ fun testMultipleEntries() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(48, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ assertAll(
+ { assertEquals(48, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test whether [TaskQueue.poll] returns values in the queue in the proper order with duplicates.
+ */
+ @Test
+ fun testMultipleEntriesDuplicate() {
+ val entryA = Runnable {}
+ queue.add(48, 0, entryA)
+
+ val entryB = Runnable {}
+ queue.add(48, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(48, 2, entryC)
+
+ assertAll(
+ { assertEquals(48, queue.peekDeadline()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that the queue is properly resized when the number of entries exceed the capacity.
+ */
+ @Test
+ fun testResize() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ val entryD = Runnable {}
+ queue.add(38, 1, entryD)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryD, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the end of the queue.
+ */
+ @Test
+ fun testRemoveEntryTail() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ queue.remove(100, 1)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the head of the queue.
+ */
+ @Test
+ fun testRemoveEntryHead() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ queue.remove(20, 1)
+
+ assertAll(
+ { assertEquals(58, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the middle of a queue.
+ */
+ @Test
+ fun testRemoveEntryMiddle() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ queue.remove(58, 1)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll()) },
+ { assertEquals(entryA, queue.poll()) },
+ { assertNull(queue.poll()) },
+ )
+ }
+
+ /**
+ * Test that we can "remove" an unknown entry without error.
+ */
+ @Test
+ fun testRemoveUnknown() {
+ val entryA = Runnable {}
+ queue.add(100, 1, entryA)
+
+ val entryB = Runnable {}
+ queue.add(20, 1, entryB)
+
+ val entryC = Runnable {}
+ queue.add(58, 1, entryC)
+
+ assertAll(
+ { assertFalse(queue.remove(10, 1)) },
+ { assertFalse(queue.remove(58, 2)) }
+ )
+ }
+}