From 44173c342d698441fbbcba4685c78f9bee40d138 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 5 Oct 2022 11:56:06 +0200 Subject: feat(sim/core): Add Java-based simulator core This change introduces a new class, `SimulationScheduler`, which provides the basis for simulations in OpenDC by allowing components to schedule future tasks using delay-skipping queue and a virtual clock. This new class is written in Java to remove any dependency on the Kotlin and kotlinx-coroutines runtime when not necessary. --- .../org/opendc/simulator/SimulationScheduler.java | 246 +++++++++++++++++++ .../main/java/org/opendc/simulator/TaskQueue.java | 265 +++++++++++++++++++++ .../opendc/simulator/SimulationSchedulerTest.kt | 106 +++++++++ .../kotlin/org/opendc/simulator/TaskQueueTest.kt | 231 ++++++++++++++++++ 4 files changed, 848 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java create mode 100644 opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java create mode 100644 opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt create mode 100644 opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt (limited to 'opendc-simulator/opendc-simulator-core/src') diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java new file mode 100644 index 00000000..a70c1cda --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.concurrent.Executor; + +/** + * A scheduler is used by simulations to manage execution of (future) tasks, providing a controllable (virtual) clock to + * skip over delays. + * + *

+ * The scheduler can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the + * virtual time as needed (via {@link #advanceUntilIdle}), or run the tasks that are scheduled to run as soon as + * possible but have not yet been dispatched (via {@link #runCurrent}). These methods execute the pending tasks using + * a single thread. + * + *

+ * This class is not thread-safe and must not be used concurrently by multiple threads. + */ +public final class SimulationScheduler implements Executor { + /** + * The {@link TaskQueue} containing the pending tasks. + */ + private final TaskQueue queue = new TaskQueue(); + + /** + * The current time of the scheduler in milliseconds since epoch. + */ + private long currentTime; + + /** + * A counter to establish total order on the events that happen at the same virtual time. + */ + private int count = 0; + + /** + * The {@link Clock} instance linked to this scheduler. + */ + private final SimulationClock clock = new SimulationClock(this, ZoneId.systemDefault()); + + /** + * Construct a {@link SimulationScheduler} instance with the specified initial time. + * + * @param initialTimeMs The initial virtual time of the scheduler in milliseconds since epoch. + */ + public SimulationScheduler(long initialTimeMs) { + this.currentTime = initialTimeMs; + } + + /** + * Construct a {@link SimulationScheduler} instance with the initial time set to UNIX Epoch 0. + */ + public SimulationScheduler() { + this(0); + } + + /** + * Return the virtual clock associated with this dispatcher. + * + * @return A {@link Clock} tracking the virtual time of the dispatcher. + */ + public Clock getClock() { + return clock; + } + + /** + * Return the current virtual timestamp of the dispatcher (in milliseconds since epoch). + * + * @return A long value representing the virtual timestamp of the dispatcher in milliseconds since epoch. + */ + public long getCurrentTime() { + return currentTime; + } + + /** + * Schedule a task that executes after the specified delayMs. + * + * @param delayMs The time from now until the execution of the task (in milliseconds). + * @param task The task to execute after the delay. + * @return The identifier of the task that can be used together with the timestamp of the task to cancel it. + */ + public int schedule(long delayMs, Runnable task) { + if (delayMs < 0) { + throw new IllegalArgumentException("Attempted scheduling an event earlier in time (delay " + delayMs + " ms)"); + } + + long target = currentTime + delayMs; + if (target < 0) { + target = Long.MAX_VALUE; + } + + int id = count++; + queue.add(target, id, task); + return id; + } + + /** + * Cancel a pending task. + * + * @param deadline The deadline of the task. + * @param id The identifier of the task (returned by {@link #schedule(long, Runnable)}). + * @return A boolean indicating whether a task was actually cancelled. + */ + public boolean cancel(long deadline, int id) { + return queue.remove(deadline, id); + } + + /** + * Run the enqueued tasks in the specified order, advancing the virtual time as needed until there are no more + * tasks in the queue of this scheduler. + */ + public void advanceUntilIdle() { + final TaskQueue queue = this.queue; + + while (true) { + long deadline = queue.peekDeadline(); + Runnable task = queue.poll(); + + if (task == null) { + break; + } + + currentTime = deadline; + task.run(); + } + } + + /** + * Move the virtual clock of this dispatcher forward by the specified amount, running the scheduled tasks in the + * meantime. + * + * @param delayMs The amount of time to move the virtual clock forward (in milliseconds). + * @throws IllegalStateException if passed a negative delay. + */ + public void advanceBy(long delayMs) { + if (delayMs < 0) { + throw new IllegalArgumentException("Can not advance time by a negative delay: " + delayMs + " ms"); + } + + long target = currentTime + delayMs; + if (target < 0) { + target = Long.MAX_VALUE; + } + + final TaskQueue queue = this.queue; + long deadline; + + while ((deadline = queue.peekDeadline()) < target) { + Runnable task = queue.poll(); // Cannot be null since while condition is always false on an empty queue + + task.run(); + currentTime = deadline; + } + + currentTime = target; + } + + /** + * Execute the tasks that are scheduled to execute at this moment of virtual time. + */ + public void runCurrent() { + final TaskQueue queue = this.queue; + long currentTime = this.currentTime; + + while (queue.peekDeadline() == currentTime) { + Runnable task = queue.poll(); + + if (task == null) { + break; + } + + task.run(); + } + } + + /** + * Schedule the specified command to run at this moment of virtual time. + * + * @param command The command to execute. + */ + @Override + public void execute(Runnable command) { + schedule(0, command); + } + + /** + * A {@link Clock} implementation for a {@link SimulationScheduler}. + */ + private static class SimulationClock extends Clock { + private final SimulationScheduler scheduler; + private final ZoneId zone; + + SimulationClock(SimulationScheduler scheduler, ZoneId zone) { + this.scheduler = scheduler; + this.zone = zone; + } + + @Override + public ZoneId getZone() { + return zone; + } + + @Override + public Clock withZone(ZoneId zoneId) { + return new SimulationClock(scheduler, zone); + } + + @Override + public Instant instant() { + return Instant.ofEpochMilli(scheduler.currentTime); + } + + @Override + public long millis() { + return scheduler.currentTime; + } + + @Override + public String toString() { + return "SimulationClock[time=" + millis() + "ms]"; + } + } +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java new file mode 100644 index 00000000..7d867b5d --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java @@ -0,0 +1,265 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator; + +import java.util.Arrays; + +/** + * Specialized priority queue for pending tasks. + * + *

+ * This class uses a specialized priority queue (as opposed to a generic {@link java.util.PriorityQueue}), which reduces + * unnecessary allocations in the simulator's hot path. + */ +final class TaskQueue { + /** + * The deadlines of the pending tasks. + */ + private long[] deadlines; + + /** + * The identifiers of the pending tasks. Identifiers are used to provide a total order for pending tasks in case + * the deadline of two tasks is the same. + */ + private int[] ids; + + /** + * The {@link Runnable}s representing the tasks that have been scheduled. + */ + private Runnable[] tasks; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link TaskQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public TaskQueue(int initialCapacity) { + this.deadlines = new long[initialCapacity]; + this.ids = new int[initialCapacity]; + this.tasks = new Runnable[initialCapacity]; + } + + /** + * Construct a {@link TaskQueue} with an initial capacity of 256 elements. + */ + public TaskQueue() { + this(256); + } + + /** + * Add a new task to this queue. + * + * @param deadline The deadline of the task. + * @param id The identifier of the task. + * @param task The {@link Runnable} representing the task to execute. + */ + public void add(long deadline, int id, Runnable task) { + int i = size; + long[] deadlines = this.deadlines; + + if (i >= deadlines.length) { + grow(); + + // Re-fetch the resized array + deadlines = this.deadlines; + } + + siftUp(deadlines, ids, tasks, i, deadline, id, task); + + size = i + 1; + } + + /** + * Retrieve the next task to be executed. + * + * @return The head of the queue or null if the queue is empty. + */ + public Runnable poll() { + final Runnable[] tasks = this.tasks; + final Runnable result = tasks[0]; + + if (result != null) { + int n = --size; + + if (n > 0) { + long[] deadlines = this.deadlines; + int[] ids = this.ids; + + siftDown(deadlines, ids, tasks, 0, n, deadlines[n], ids[n], tasks[n]); + } + + // Clear the last element of the queue + tasks[n] = null; + } + + return result; + } + + /** + * Find the earliest deadline in the queue. + * + * @return The earliest deadline in the queue or {@link Long#MAX_VALUE} if the queue is empty. + */ + public long peekDeadline() { + if (size == 0) { + return Long.MAX_VALUE; + } + + return deadlines[0]; + } + + /** + * Remove the timer entry with the specified deadline and id. + */ + public boolean remove(long deadline, int id) { + long[] deadlines = this.deadlines; + int[] ids = this.ids; + + int size = this.size; + int i = -1; + + for (int j = 0; j < size; j++) { + if (deadlines[j] == deadline && ids[j] == id) { + i = j; + break; + } + } + + if (i < 0) { + return false; + } + + Runnable[] tasks = this.tasks; + int s = size - 1; + this.size = s; + + if (s == i) { + tasks[i] = null; + } else { + long movedDeadline = deadlines[s]; + int movedId = ids[s]; + Runnable movedTask = tasks[s]; + + tasks[s] = null; + + siftDown(deadlines, ids, tasks, i, s, movedDeadline, movedId, movedTask); + if (tasks[i] == movedTask) { + siftUp(deadlines, ids, tasks, i, movedDeadline, movedId, movedTask); + } + } + + return true; + } + + /** + * Increases the capacity of the priority queue. + */ + private void grow() { + int oldCapacity = deadlines.length; + + // Double size if small; else grow by 50% + int newCapacity = oldCapacity + oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1; + + deadlines = Arrays.copyOf(deadlines, newCapacity); + ids = Arrays.copyOf(ids, newCapacity); + tasks = Arrays.copyOf(tasks, newCapacity); + } + + /** + * Sift up an entry in the heap. + */ + private static void siftUp(long[] deadlines, int[] ids, Runnable[] tasks, int k, long deadline, int id, Runnable task) { + while (k > 0) { + int parent = (k - 1) >>> 1; + long parentDeadline = deadlines[parent]; + int parentId = ids[parent]; + + if (compare(deadline, id, parentDeadline, parentId) >= 0) { + break; + } + + deadlines[k] = parentDeadline; + ids[k] = parentId; + tasks[k] = tasks[parent]; + + k = parent; + } + + deadlines[k] = deadline; + ids[k] = id; + tasks[k] = task; + } + + /** + * Sift down an entry in the heap. + */ + private static void siftDown(long[] deadlines, int[] ids, Runnable[] tasks, int k, int n, long deadline, int id, Runnable task) { + int half = n >>> 1; // loop while a non-leaf + + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + + long childDeadline = deadlines[child]; + int childId = ids[child]; + + int right = child + 1; + if (right < n) { + long rightDeadline = deadlines[right]; + int rightId = ids[right]; + + if (compare(childDeadline, childId, rightDeadline, rightId) > 0) { + child = right; + childDeadline = rightDeadline; + childId = rightId; + } + } + + if (compare(deadline, id, childDeadline, childId) <= 0) { + break; + } + + deadlines[k] = childDeadline; + ids[k] = childId; + tasks[k] = tasks[child]; + + k = child; + } + + deadlines[k] = deadline; + ids[k] = id; + tasks[k] = task; + } + + /** + * Helper method to compare two task entries. + */ + private static int compare(long leftDeadline, int leftId, long rightDeadline, int rightId) { + int cmp = Long.compare(leftDeadline, rightDeadline); + return cmp == 0 ? Integer.compare(leftId, rightId) : cmp; + } +} diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt new file mode 100644 index 00000000..eca3b582 --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/SimulationSchedulerTest.kt @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.time.Instant + +/** + * Test suite for the [SimulationScheduler] class. + */ +class SimulationSchedulerTest { + /** + * Test the basic functionality of [SimulationScheduler.runCurrent]. + */ + @Test + fun testRunCurrent() { + val scheduler = SimulationScheduler() + var count = 0 + + scheduler.schedule(1) { count += 1 } + scheduler.schedule(2) { count += 1 } + + scheduler.advanceBy(1) + assertEquals(0, count) + scheduler.runCurrent() + assertEquals(1, count) + scheduler.advanceBy(1) + assertEquals(1, count) + scheduler.runCurrent() + assertEquals(2, count) + assertEquals(2, scheduler.currentTime) + + scheduler.advanceBy(Long.MAX_VALUE) + scheduler.runCurrent() + assertEquals(Long.MAX_VALUE, scheduler.currentTime) + } + + /** + * Test the clock of the [SimulationScheduler]. + */ + @Test + fun testClock() { + val scheduler = SimulationScheduler() + var count = 0 + + scheduler.schedule(1) { count += 1 } + scheduler.schedule(2) { count += 1 } + + scheduler.advanceBy(2) + assertEquals(2, scheduler.currentTime) + assertEquals(2, scheduler.clock.millis()) + assertEquals(Instant.ofEpochMilli(2), scheduler.clock.instant()) + } + + /** + * Test large delays. + */ + @Test + fun testAdvanceByLargeDelays() { + val scheduler = SimulationScheduler() + var count = 0 + + scheduler.schedule(1) { count += 1 } + + scheduler.advanceBy(10) + + scheduler.schedule(Long.MAX_VALUE) { count += 1 } + scheduler.schedule(100_000_000) { count += 1 } + + scheduler.advanceUntilIdle() + assertEquals(3, count) + } + + /** + * Test negative delays. + */ + @Test + fun testNegativeDelays() { + val scheduler = SimulationScheduler() + + assertThrows { scheduler.schedule(-100) { } } + assertThrows { scheduler.advanceBy(-100) } + } +} diff --git a/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt new file mode 100644 index 00000000..a4d779cb --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/test/kotlin/org/opendc/simulator/TaskQueueTest.kt @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * Test suite for the [TaskQueue] class. + */ +class TaskQueueTest { + private lateinit var queue: TaskQueue + + @BeforeEach + fun setUp() { + queue = TaskQueue(3) + } + + /** + * Test whether a call to [TaskQueue.poll] returns `null` for an empty queue. + */ + @Test + fun testPollEmpty() { + assertAll( + { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test whether a call to [TaskQueue.poll] returns the proper value for a queue with a single entry. + */ + @Test + fun testSingleEntry() { + val entry = Runnable {} + + queue.add(100, 1, entry) + + assertAll( + { assertEquals(100, queue.peekDeadline()) }, + { assertEquals(entry, queue.poll()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test whether [TaskQueue.poll] returns values in the queue in the proper order. + */ + @Test + fun testMultipleEntries() { + val entryA = Runnable {} + queue.add(100, 1, entryA) + + val entryB = Runnable {} + queue.add(48, 1, entryB) + + val entryC = Runnable {} + queue.add(58, 1, entryC) + + assertAll( + { assertEquals(48, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll()) }, + { assertEquals(entryC, queue.poll()) }, + { assertEquals(entryA, queue.poll()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test whether [TaskQueue.poll] returns values in the queue in the proper order with duplicates. + */ + @Test + fun testMultipleEntriesDuplicate() { + val entryA = Runnable {} + queue.add(48, 0, entryA) + + val entryB = Runnable {} + queue.add(48, 1, entryB) + + val entryC = Runnable {} + queue.add(48, 2, entryC) + + assertAll( + { assertEquals(48, queue.peekDeadline()) }, + { assertEquals(entryA, queue.poll()) }, + { assertEquals(entryB, queue.poll()) }, + { assertEquals(entryC, queue.poll()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test that the queue is properly resized when the number of entries exceed the capacity. + */ + @Test + fun testResize() { + val entryA = Runnable {} + queue.add(100, 1, entryA) + + val entryB = Runnable {} + queue.add(20, 1, entryB) + + val entryC = Runnable {} + queue.add(58, 1, entryC) + + val entryD = Runnable {} + queue.add(38, 1, entryD) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll()) }, + { assertEquals(entryD, queue.poll()) }, + { assertEquals(entryC, queue.poll()) }, + { assertEquals(entryA, queue.poll()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test that we can remove an entry from the end of the queue. + */ + @Test + fun testRemoveEntryTail() { + val entryA = Runnable {} + queue.add(100, 1, entryA) + + val entryB = Runnable {} + queue.add(20, 1, entryB) + + val entryC = Runnable {} + queue.add(58, 1, entryC) + + queue.remove(100, 1) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll()) }, + { assertEquals(entryC, queue.poll()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test that we can remove an entry from the head of the queue. + */ + @Test + fun testRemoveEntryHead() { + val entryA = Runnable {} + queue.add(100, 1, entryA) + + val entryB = Runnable {} + queue.add(20, 1, entryB) + + val entryC = Runnable {} + queue.add(58, 1, entryC) + + queue.remove(20, 1) + + assertAll( + { assertEquals(58, queue.peekDeadline()) }, + { assertEquals(entryC, queue.poll()) }, + { assertEquals(entryA, queue.poll()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test that we can remove an entry from the middle of a queue. + */ + @Test + fun testRemoveEntryMiddle() { + val entryA = Runnable {} + queue.add(100, 1, entryA) + + val entryB = Runnable {} + queue.add(20, 1, entryB) + + val entryC = Runnable {} + queue.add(58, 1, entryC) + + queue.remove(58, 1) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll()) }, + { assertEquals(entryA, queue.poll()) }, + { assertNull(queue.poll()) }, + ) + } + + /** + * Test that we can "remove" an unknown entry without error. + */ + @Test + fun testRemoveUnknown() { + val entryA = Runnable {} + queue.add(100, 1, entryA) + + val entryB = Runnable {} + queue.add(20, 1, entryB) + + val entryC = Runnable {} + queue.add(58, 1, entryC) + + assertAll( + { assertFalse(queue.remove(10, 1)) }, + { assertFalse(queue.remove(58, 2)) } + ) + } +} -- cgit v1.2.3 From c214a7fe0d46ecc23a71f9237b20281c0ca1c929 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 5 Oct 2022 13:22:40 +0200 Subject: refactor(sim/core): Use SimulationScheduler in coroutine dispatcher This change updates the implementation of `SimulationDispatcher` to use a (possibly user-provided) `SimulationScheduler` for managing the execution of the simulation and future tasks. --- .../opendc/simulator/core/SimulationBuilders.kt | 75 --------- .../opendc/simulator/core/SimulationController.kt | 46 ------ .../core/SimulationCoroutineDispatcher.kt | 167 --------------------- .../simulator/core/SimulationCoroutineScope.kt | 62 -------- .../opendc/simulator/kotlin/SimulationBuilders.kt | 80 ++++++++++ .../simulator/kotlin/SimulationController.kt | 52 +++++++ .../kotlin/SimulationCoroutineDispatcher.kt | 94 ++++++++++++ .../simulator/kotlin/SimulationCoroutineScope.kt | 66 ++++++++ 8 files changed, 292 insertions(+), 350 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt delete mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt delete mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt delete mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt create mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt create mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt create mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt create mode 100644 opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt (limited to 'opendc-simulator/opendc-simulator-core/src') diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt deleted file mode 100644 index 9b284c11..00000000 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.core - -import kotlinx.coroutines.* -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext - -/** - * Executes a [body] inside an immediate execution dispatcher. - */ -@OptIn(ExperimentalCoroutinesApi::class) -public fun runBlockingSimulation(context: CoroutineContext = EmptyCoroutineContext, body: suspend SimulationCoroutineScope.() -> Unit) { - val (safeContext, dispatcher) = context.checkArguments() - val startingJobs = safeContext.activeJobs() - val scope = SimulationCoroutineScope(safeContext) - val deferred = scope.async { - body(scope) - } - dispatcher.advanceUntilIdle() - deferred.getCompletionExceptionOrNull()?.let { - throw it - } - val endingJobs = safeContext.activeJobs() - if ((endingJobs - startingJobs).isNotEmpty()) { - throw IllegalStateException("Test finished with active jobs: $endingJobs") - } -} - -/** - * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineScope]. - */ -public fun SimulationCoroutineScope.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = - runBlockingSimulation(coroutineContext, block) - -/** - * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineDispatcher]. - */ -public fun SimulationCoroutineDispatcher.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = - runBlockingSimulation(this, block) - -private fun CoroutineContext.checkArguments(): Pair { - val dispatcher = get(ContinuationInterceptor).run { - this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } } - this ?: SimulationCoroutineDispatcher() - } - - val job = get(Job) ?: SupervisorJob() - return Pair(this + dispatcher + job, dispatcher as SimulationController) -} - -private fun CoroutineContext.activeJobs(): Set { - return checkNotNull(this[Job]).children.filter { it.isActive }.toSet() -} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt deleted file mode 100644 index 2b670b91..00000000 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.core - -import kotlinx.coroutines.CoroutineDispatcher -import java.time.Clock - -/** - * Control the virtual clock of a [CoroutineDispatcher]. - */ -public interface SimulationController { - /** - * The current virtual clock as it is known to this Dispatcher. - */ - public val clock: Clock - - /** - * Immediately execute all pending tasks and advance the virtual clock-time to the last delay. - * - * If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle` - * returns. - * - * @return the amount of delay-time that this Dispatcher's clock has been forwarded in milliseconds. - */ - public fun advanceUntilIdle(): Long -} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt deleted file mode 100644 index 908e902a..00000000 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.core - -import kotlinx.coroutines.* -import java.lang.Runnable -import java.time.Clock -import java.time.Instant -import java.time.ZoneId -import java.util.* -import kotlin.coroutines.CoroutineContext - -/** - * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual - * clock for time management. - */ -@OptIn(InternalCoroutinesApi::class) -public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationController, Delay { - /** - * Queue of ordered tasks to run. - */ - private val queue = PriorityQueue() - - /** - * Global order counter. - */ - private var _counter = 0L - - /** - * The current virtual time of simulation - */ - private var _clock = SimClock() - - /** - * The virtual clock of this dispatcher. - */ - override val clock: Clock = ClockAdapter(_clock) - - override fun dispatch(context: CoroutineContext, block: Runnable) { - block.run() - } - - override fun dispatchYield(context: CoroutineContext, block: Runnable) { - post(block) - } - - @OptIn(ExperimentalCoroutinesApi::class) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - postDelayed(CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) }, timeMillis) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val node = postDelayed(block, timeMillis) - return object : DisposableHandle { - override fun dispose() { - queue.remove(node) - } - } - } - - override fun toString(): String { - return "SimulationCoroutineDispatcher[time=${_clock.time}ms, queued=${queue.size}]" - } - - private fun post(block: Runnable) = - queue.add(TimedRunnable(block, _counter++)) - - private fun postDelayed(block: Runnable, delayTime: Long) = - TimedRunnable(block, _counter++, safePlus(_clock.time, delayTime)) - .also { - queue.add(it) - } - - private fun safePlus(currentTime: Long, delayTime: Long): Long { - check(delayTime >= 0) - val result = currentTime + delayTime - if (result < currentTime) return Long.MAX_VALUE // clamp on overflow - return result - } - - override fun advanceUntilIdle(): Long { - val queue = queue - val clock = _clock - val oldTime = clock.time - - while (true) { - val current = queue.poll() ?: break - - // If the scheduled time is 0 (immediate) use current virtual time - if (current.time != 0L) { - clock.time = current.time - } - - current.run() - } - - return clock.time - oldTime - } - - /** - * A helper class that holds the time of the simulation. - */ - private class SimClock(@JvmField var time: Long = 0) - - /** - * A helper class to expose a [Clock] instance for this dispatcher. - */ - private class ClockAdapter(private val clock: SimClock, private val zone: ZoneId = ZoneId.systemDefault()) : Clock() { - override fun getZone(): ZoneId = zone - - override fun withZone(zone: ZoneId): Clock = ClockAdapter(clock, zone) - - override fun instant(): Instant = Instant.ofEpochMilli(millis()) - - override fun millis(): Long = clock.time - - override fun toString(): String = "SimulationCoroutineDispatcher.ClockAdapter[time=${clock.time}]" - } - - /** - * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled - * in the future. - */ - private class CancellableContinuationRunnable( - @JvmField val continuation: CancellableContinuation, - private val block: CancellableContinuation.() -> Unit - ) : Runnable { - override fun run() = continuation.block() - } - - /** - * A Runnable for our event loop that represents a task to perform at a time. - */ - private class TimedRunnable( - @JvmField val runnable: Runnable, - private val count: Long = 0, - @JvmField val time: Long = 0 - ) : Comparable, Runnable by runnable { - override fun compareTo(other: TimedRunnable) = if (time == other.time) { - count.compareTo(other.count) - } else { - time.compareTo(other.time) - } - - override fun toString() = "TimedRunnable[time=$time, run=$runnable]" - } -} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt deleted file mode 100644 index 1da7f0fa..00000000 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.core - -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext - -/** - * A scope which provides detailed control over the execution of coroutines for simulations. - */ -public interface SimulationCoroutineScope : CoroutineScope, SimulationController - -private class SimulationCoroutineScopeImpl( - override val coroutineContext: CoroutineContext -) : - SimulationCoroutineScope, - SimulationController by coroutineContext.simulationController - -/** - * A scope which provides detailed control over the execution of coroutines for simulations. - * - * If the provided context does not provide a [ContinuationInterceptor] (Dispatcher) or [CoroutineExceptionHandler], the - * scope adds [SimulationCoroutineDispatcher] automatically. - */ -@Suppress("FunctionName") -public fun SimulationCoroutineScope(context: CoroutineContext = EmptyCoroutineContext): SimulationCoroutineScope { - var safeContext = context - if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher() - return SimulationCoroutineScopeImpl(safeContext) -} - -private inline val CoroutineContext.simulationController: SimulationController - get() { - val handler = this[ContinuationInterceptor] - return handler as? SimulationController ?: throw IllegalArgumentException( - "SimulationCoroutineScope requires a SimulationController such as SimulatorCoroutineDispatcher as " + - "the ContinuationInterceptor (Dispatcher)" - ) - } diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt new file mode 100644 index 00000000..a291b4e2 --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.kotlin + +import kotlinx.coroutines.* +import org.opendc.simulator.SimulationScheduler +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Executes a [body] inside an immediate execution dispatcher. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public fun runBlockingSimulation( + context: CoroutineContext = EmptyCoroutineContext, + scheduler: SimulationScheduler = SimulationScheduler(), + body: suspend SimulationCoroutineScope.() -> Unit +) { + val (safeContext, dispatcher) = context.checkArguments(scheduler) + val startingJobs = safeContext.activeJobs() + val scope = SimulationCoroutineScope(safeContext) + val deferred = scope.async { + body(scope) + } + dispatcher.advanceUntilIdle() + deferred.getCompletionExceptionOrNull()?.let { + throw it + } + val endingJobs = safeContext.activeJobs() + if ((endingJobs - startingJobs).isNotEmpty()) { + throw IllegalStateException("Test finished with active jobs: $endingJobs") + } +} + +/** + * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineScope]. + */ +public fun SimulationCoroutineScope.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runBlockingSimulation(coroutineContext, scheduler, block) + +/** + * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineDispatcher]. + */ +public fun SimulationCoroutineDispatcher.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runBlockingSimulation(this, scheduler, block) + +private fun CoroutineContext.checkArguments(scheduler: SimulationScheduler): Pair { + val dispatcher = get(ContinuationInterceptor).run { + this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } } + this ?: SimulationCoroutineDispatcher(scheduler) + } + + val job = get(Job) ?: SupervisorJob() + return Pair(this + dispatcher + job, dispatcher as SimulationController) +} + +private fun CoroutineContext.activeJobs(): Set { + return checkNotNull(this[Job]).children.filter { it.isActive }.toSet() +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt new file mode 100644 index 00000000..f96b2326 --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.kotlin + +import kotlinx.coroutines.CoroutineDispatcher +import org.opendc.simulator.SimulationScheduler +import java.time.Clock + +/** + * Control the virtual clock of a [CoroutineDispatcher]. + */ +public interface SimulationController { + /** + * The current virtual clock as it is known to this Dispatcher. + */ + public val clock: Clock + + /** + * The [SimulationScheduler] driving the simulation. + */ + public val scheduler: SimulationScheduler + + /** + * Immediately execute all pending tasks and advance the virtual clock-time to the last delay. + * + * If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle` + * returns. + * + * @return the amount of delay-time that this Dispatcher's clock has been forwarded in milliseconds. + */ + public fun advanceUntilIdle(): Long +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt new file mode 100644 index 00000000..21ad1a86 --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.kotlin + +import kotlinx.coroutines.* +import org.opendc.simulator.SimulationScheduler +import java.lang.Runnable +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual + * clock for time management. + * + * @param scheduler The [SimulationScheduler] used to manage the execution of future tasks. + */ +@OptIn(InternalCoroutinesApi::class) +public class SimulationCoroutineDispatcher( + override val scheduler: SimulationScheduler = SimulationScheduler() +) : CoroutineDispatcher(), SimulationController, Delay { + /** + * The virtual clock of this dispatcher. + */ + override val clock: Clock = scheduler.clock + + override fun dispatch(context: CoroutineContext, block: Runnable) { + block.run() + } + + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + scheduler.execute(block) + } + + @OptIn(ExperimentalCoroutinesApi::class) + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + scheduler.schedule(timeMillis, CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) }) + } + + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + return object : DisposableHandle { + private val deadline = (scheduler.currentTime + timeMillis).let { if (it >= 0) it else Long.MAX_VALUE } + private val id = scheduler.schedule(timeMillis, block) + + override fun dispose() { + scheduler.cancel(deadline, id) + } + } + } + + override fun toString(): String { + return "SimulationCoroutineDispatcher[time=${scheduler.currentTime}ms]" + } + + override fun advanceUntilIdle(): Long { + val scheduler = scheduler + val oldTime = scheduler.currentTime + + scheduler.advanceUntilIdle() + + return scheduler.currentTime - oldTime + } + + /** + * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled + * in the future. + */ + private class CancellableContinuationRunnable( + @JvmField val continuation: CancellableContinuation, + private val block: CancellableContinuation.() -> Unit + ) : Runnable { + override fun run() = continuation.block() + } +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt new file mode 100644 index 00000000..6be8e67a --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.kotlin + +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import org.opendc.simulator.SimulationScheduler +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * A scope which provides detailed control over the execution of coroutines for simulations. + */ +public interface SimulationCoroutineScope : CoroutineScope, SimulationController + +private class SimulationCoroutineScopeImpl( + override val coroutineContext: CoroutineContext +) : + SimulationCoroutineScope, + SimulationController by coroutineContext.simulationController + +/** + * A scope which provides detailed control over the execution of coroutines for simulations. + * + * If the provided context does not provide a [ContinuationInterceptor] (Dispatcher) or [CoroutineExceptionHandler], the + * scope adds [SimulationCoroutineDispatcher] automatically. + */ +@Suppress("FunctionName") +public fun SimulationCoroutineScope( + context: CoroutineContext = EmptyCoroutineContext, + scheduler: SimulationScheduler = SimulationScheduler() +): SimulationCoroutineScope { + var safeContext = context + if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher(scheduler) + return SimulationCoroutineScopeImpl(safeContext) +} + +private inline val CoroutineContext.simulationController: SimulationController + get() { + val handler = this[ContinuationInterceptor] + return handler as? SimulationController ?: throw IllegalArgumentException( + "SimulationCoroutineScope requires a SimulationController such as SimulatorCoroutineDispatcher as " + + "the ContinuationInterceptor (Dispatcher)" + ) + } -- cgit v1.2.3 From be176910eb870209576326ffaad8bf21241fccbd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 5 Oct 2022 14:10:11 +0200 Subject: refactor(sim/core): Rename runBlockingSimulation to runSimulation This change renames the method `runBlockingSimulation` to `runSimulation` to put more emphasis on the simulation part of the method. The blocking part is not that important, but this behavior is still described in the method documentation. --- .../opendc/simulator/kotlin/SimulationBuilders.kt | 38 +++++++++++++++++----- 1 file changed, 30 insertions(+), 8 deletions(-) (limited to 'opendc-simulator/opendc-simulator-core/src') diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt index a291b4e2..c4cc0171 100644 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt @@ -29,10 +29,32 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext /** - * Executes a [body] inside an immediate execution dispatcher. + * Executes [body] as a simulation in a new coroutine. + * + * This function behaves similarly to [runBlocking], with the difference that the code that it runs will skip delays. + * This allows to use [delay] in without causing the simulation to take more time than necessary. + * + * ``` + * @Test + * fun exampleSimulation() = runSimulation { + * val deferred = async { + * delay(1_000) + * async { + * delay(1_000) + * }.await() + * } + * + * deferred.await() // result available immediately + * } + * ``` + * + * The simulation is run in a single thread, unless other [CoroutineDispatcher] are used for child coroutines. + * Because of this, child coroutines are not executed in parallel to [body]. + * In order for the spawned-off asynchronous code to actually be executed, one must either [yield] or suspend the + * body some other way, or use commands that control scheduling (see [SimulationScheduler]). */ @OptIn(ExperimentalCoroutinesApi::class) -public fun runBlockingSimulation( +public fun runSimulation( context: CoroutineContext = EmptyCoroutineContext, scheduler: SimulationScheduler = SimulationScheduler(), body: suspend SimulationCoroutineScope.() -> Unit @@ -54,16 +76,16 @@ public fun runBlockingSimulation( } /** - * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineScope]. + * Convenience method for calling [runSimulation] on an existing [SimulationCoroutineScope]. */ -public fun SimulationCoroutineScope.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = - runBlockingSimulation(coroutineContext, scheduler, block) +public fun SimulationCoroutineScope.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runSimulation(coroutineContext, scheduler, block) /** - * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineDispatcher]. + * Convenience method for calling [runSimulation] on an existing [SimulationCoroutineDispatcher]. */ -public fun SimulationCoroutineDispatcher.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = - runBlockingSimulation(this, scheduler, block) +public fun SimulationCoroutineDispatcher.runSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runSimulation(this, scheduler, block) private fun CoroutineContext.checkArguments(scheduler: SimulationScheduler): Pair { val dispatcher = get(ContinuationInterceptor).run { -- cgit v1.2.3