summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-core/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-05 11:56:06 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-05 13:43:50 +0200
commit44173c342d698441fbbcba4685c78f9bee40d138 (patch)
tree1878eb31b3156e8ba387946795a3d2789e83b78c /opendc-simulator/opendc-simulator-core/src/main
parentec3b5b462c1b8296ba18a3872f56d569fa70e45b (diff)
feat(sim/core): Add Java-based simulator core
This change introduces a new class, `SimulationScheduler`, which provides the basis for simulations in OpenDC by allowing components to schedule future tasks using delay-skipping queue and a virtual clock. This new class is written in Java to remove any dependency on the Kotlin and kotlinx-coroutines runtime when not necessary.
Diffstat (limited to 'opendc-simulator/opendc-simulator-core/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java246
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java265
2 files changed, 511 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
new file mode 100644
index 00000000..a70c1cda
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/SimulationScheduler.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.concurrent.Executor;
+
+/**
+ * A scheduler is used by simulations to manage execution of (future) tasks, providing a controllable (virtual) clock to
+ * skip over delays.
+ *
+ * <p>
+ * The scheduler can be queried to advance the time (via {@link #advanceBy}), run all the scheduled tasks advancing the
+ * virtual time as needed (via {@link #advanceUntilIdle}), or run the tasks that are scheduled to run as soon as
+ * possible but have not yet been dispatched (via {@link #runCurrent}). These methods execute the pending tasks using
+ * a single thread.
+ *
+ * <p>
+ * This class is not thread-safe and must not be used concurrently by multiple threads.
+ */
+public final class SimulationScheduler implements Executor {
+ /**
+ * The {@link TaskQueue} containing the pending tasks.
+ */
+ private final TaskQueue queue = new TaskQueue();
+
+ /**
+ * The current time of the scheduler in milliseconds since epoch.
+ */
+ private long currentTime;
+
+ /**
+ * A counter to establish total order on the events that happen at the same virtual time.
+ */
+ private int count = 0;
+
+ /**
+ * The {@link Clock} instance linked to this scheduler.
+ */
+ private final SimulationClock clock = new SimulationClock(this, ZoneId.systemDefault());
+
+ /**
+ * Construct a {@link SimulationScheduler} instance with the specified initial time.
+ *
+ * @param initialTimeMs The initial virtual time of the scheduler in milliseconds since epoch.
+ */
+ public SimulationScheduler(long initialTimeMs) {
+ this.currentTime = initialTimeMs;
+ }
+
+ /**
+ * Construct a {@link SimulationScheduler} instance with the initial time set to UNIX Epoch 0.
+ */
+ public SimulationScheduler() {
+ this(0);
+ }
+
+ /**
+ * Return the virtual clock associated with this dispatcher.
+ *
+ * @return A {@link Clock} tracking the virtual time of the dispatcher.
+ */
+ public Clock getClock() {
+ return clock;
+ }
+
+ /**
+ * Return the current virtual timestamp of the dispatcher (in milliseconds since epoch).
+ *
+ * @return A long value representing the virtual timestamp of the dispatcher in milliseconds since epoch.
+ */
+ public long getCurrentTime() {
+ return currentTime;
+ }
+
+ /**
+ * Schedule a <code>task</code> that executes after the specified <code>delayMs</code>.
+ *
+ * @param delayMs The time from now until the execution of the task (in milliseconds).
+ * @param task The task to execute after the delay.
+ * @return The identifier of the task that can be used together with the timestamp of the task to cancel it.
+ */
+ public int schedule(long delayMs, Runnable task) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("Attempted scheduling an event earlier in time (delay " + delayMs + " ms)");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ int id = count++;
+ queue.add(target, id, task);
+ return id;
+ }
+
+ /**
+ * Cancel a pending task.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task (returned by {@link #schedule(long, Runnable)}).
+ * @return A boolean indicating whether a task was actually cancelled.
+ */
+ public boolean cancel(long deadline, int id) {
+ return queue.remove(deadline, id);
+ }
+
+ /**
+ * Run the enqueued tasks in the specified order, advancing the virtual time as needed until there are no more
+ * tasks in the queue of this scheduler.
+ */
+ public void advanceUntilIdle() {
+ final TaskQueue queue = this.queue;
+
+ while (true) {
+ long deadline = queue.peekDeadline();
+ Runnable task = queue.poll();
+
+ if (task == null) {
+ break;
+ }
+
+ currentTime = deadline;
+ task.run();
+ }
+ }
+
+ /**
+ * Move the virtual clock of this dispatcher forward by the specified amount, running the scheduled tasks in the
+ * meantime.
+ *
+ * @param delayMs The amount of time to move the virtual clock forward (in milliseconds).
+ * @throws IllegalStateException if passed a negative <code>delay</code>.
+ */
+ public void advanceBy(long delayMs) {
+ if (delayMs < 0) {
+ throw new IllegalArgumentException("Can not advance time by a negative delay: " + delayMs + " ms");
+ }
+
+ long target = currentTime + delayMs;
+ if (target < 0) {
+ target = Long.MAX_VALUE;
+ }
+
+ final TaskQueue queue = this.queue;
+ long deadline;
+
+ while ((deadline = queue.peekDeadline()) < target) {
+ Runnable task = queue.poll(); // Cannot be null since while condition is always false on an empty queue
+
+ task.run();
+ currentTime = deadline;
+ }
+
+ currentTime = target;
+ }
+
+ /**
+ * Execute the tasks that are scheduled to execute at this moment of virtual time.
+ */
+ public void runCurrent() {
+ final TaskQueue queue = this.queue;
+ long currentTime = this.currentTime;
+
+ while (queue.peekDeadline() == currentTime) {
+ Runnable task = queue.poll();
+
+ if (task == null) {
+ break;
+ }
+
+ task.run();
+ }
+ }
+
+ /**
+ * Schedule the specified command to run at this moment of virtual time.
+ *
+ * @param command The command to execute.
+ */
+ @Override
+ public void execute(Runnable command) {
+ schedule(0, command);
+ }
+
+ /**
+ * A {@link Clock} implementation for a {@link SimulationScheduler}.
+ */
+ private static class SimulationClock extends Clock {
+ private final SimulationScheduler scheduler;
+ private final ZoneId zone;
+
+ SimulationClock(SimulationScheduler scheduler, ZoneId zone) {
+ this.scheduler = scheduler;
+ this.zone = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zone;
+ }
+
+ @Override
+ public Clock withZone(ZoneId zoneId) {
+ return new SimulationClock(scheduler, zone);
+ }
+
+ @Override
+ public Instant instant() {
+ return Instant.ofEpochMilli(scheduler.currentTime);
+ }
+
+ @Override
+ public long millis() {
+ return scheduler.currentTime;
+ }
+
+ @Override
+ public String toString() {
+ return "SimulationClock[time=" + millis() + "ms]";
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java
new file mode 100644
index 00000000..7d867b5d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-core/src/main/java/org/opendc/simulator/TaskQueue.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator;
+
+import java.util.Arrays;
+
+/**
+ * Specialized priority queue for pending tasks.
+ *
+ * <p>
+ * This class uses a specialized priority queue (as opposed to a generic {@link java.util.PriorityQueue}), which reduces
+ * unnecessary allocations in the simulator's hot path.
+ */
+final class TaskQueue {
+ /**
+ * The deadlines of the pending tasks.
+ */
+ private long[] deadlines;
+
+ /**
+ * The identifiers of the pending tasks. Identifiers are used to provide a total order for pending tasks in case
+ * the deadline of two tasks is the same.
+ */
+ private int[] ids;
+
+ /**
+ * The {@link Runnable}s representing the tasks that have been scheduled.
+ */
+ private Runnable[] tasks;
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private int size = 0;
+
+ /**
+ * Construct a {@link TaskQueue} with the specified initial capacity.
+ *
+ * @param initialCapacity The initial capacity of the queue.
+ */
+ public TaskQueue(int initialCapacity) {
+ this.deadlines = new long[initialCapacity];
+ this.ids = new int[initialCapacity];
+ this.tasks = new Runnable[initialCapacity];
+ }
+
+ /**
+ * Construct a {@link TaskQueue} with an initial capacity of 256 elements.
+ */
+ public TaskQueue() {
+ this(256);
+ }
+
+ /**
+ * Add a new task to this queue.
+ *
+ * @param deadline The deadline of the task.
+ * @param id The identifier of the task.
+ * @param task The {@link Runnable} representing the task to execute.
+ */
+ public void add(long deadline, int id, Runnable task) {
+ int i = size;
+ long[] deadlines = this.deadlines;
+
+ if (i >= deadlines.length) {
+ grow();
+
+ // Re-fetch the resized array
+ deadlines = this.deadlines;
+ }
+
+ siftUp(deadlines, ids, tasks, i, deadline, id, task);
+
+ size = i + 1;
+ }
+
+ /**
+ * Retrieve the next task to be executed.
+ *
+ * @return The head of the queue or <code>null</code> if the queue is empty.
+ */
+ public Runnable poll() {
+ final Runnable[] tasks = this.tasks;
+ final Runnable result = tasks[0];
+
+ if (result != null) {
+ int n = --size;
+
+ if (n > 0) {
+ long[] deadlines = this.deadlines;
+ int[] ids = this.ids;
+
+ siftDown(deadlines, ids, tasks, 0, n, deadlines[n], ids[n], tasks[n]);
+ }
+
+ // Clear the last element of the queue
+ tasks[n] = null;
+ }
+
+ return result;
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ *
+ * @return The earliest deadline in the queue or {@link Long#MAX_VALUE} if the queue is empty.
+ */
+ public long peekDeadline() {
+ if (size == 0) {
+ return Long.MAX_VALUE;
+ }
+
+ return deadlines[0];
+ }
+
+ /**
+ * Remove the timer entry with the specified <code>deadline</code> and <code>id</code>.
+ */
+ public boolean remove(long deadline, int id) {
+ long[] deadlines = this.deadlines;
+ int[] ids = this.ids;
+
+ int size = this.size;
+ int i = -1;
+
+ for (int j = 0; j < size; j++) {
+ if (deadlines[j] == deadline && ids[j] == id) {
+ i = j;
+ break;
+ }
+ }
+
+ if (i < 0) {
+ return false;
+ }
+
+ Runnable[] tasks = this.tasks;
+ int s = size - 1;
+ this.size = s;
+
+ if (s == i) {
+ tasks[i] = null;
+ } else {
+ long movedDeadline = deadlines[s];
+ int movedId = ids[s];
+ Runnable movedTask = tasks[s];
+
+ tasks[s] = null;
+
+ siftDown(deadlines, ids, tasks, i, s, movedDeadline, movedId, movedTask);
+ if (tasks[i] == movedTask) {
+ siftUp(deadlines, ids, tasks, i, movedDeadline, movedId, movedTask);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Increases the capacity of the priority queue.
+ */
+ private void grow() {
+ int oldCapacity = deadlines.length;
+
+ // Double size if small; else grow by 50%
+ int newCapacity = oldCapacity + oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1;
+
+ deadlines = Arrays.copyOf(deadlines, newCapacity);
+ ids = Arrays.copyOf(ids, newCapacity);
+ tasks = Arrays.copyOf(tasks, newCapacity);
+ }
+
+ /**
+ * Sift up an entry in the heap.
+ */
+ private static void siftUp(long[] deadlines, int[] ids, Runnable[] tasks, int k, long deadline, int id, Runnable task) {
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ long parentDeadline = deadlines[parent];
+ int parentId = ids[parent];
+
+ if (compare(deadline, id, parentDeadline, parentId) >= 0) {
+ break;
+ }
+
+ deadlines[k] = parentDeadline;
+ ids[k] = parentId;
+ tasks[k] = tasks[parent];
+
+ k = parent;
+ }
+
+ deadlines[k] = deadline;
+ ids[k] = id;
+ tasks[k] = task;
+ }
+
+ /**
+ * Sift down an entry in the heap.
+ */
+ private static void siftDown(long[] deadlines, int[] ids, Runnable[] tasks, int k, int n, long deadline, int id, Runnable task) {
+ int half = n >>> 1; // loop while a non-leaf
+
+ while (k < half) {
+ int child = (k << 1) + 1; // assume left child is least
+
+ long childDeadline = deadlines[child];
+ int childId = ids[child];
+
+ int right = child + 1;
+ if (right < n) {
+ long rightDeadline = deadlines[right];
+ int rightId = ids[right];
+
+ if (compare(childDeadline, childId, rightDeadline, rightId) > 0) {
+ child = right;
+ childDeadline = rightDeadline;
+ childId = rightId;
+ }
+ }
+
+ if (compare(deadline, id, childDeadline, childId) <= 0) {
+ break;
+ }
+
+ deadlines[k] = childDeadline;
+ ids[k] = childId;
+ tasks[k] = tasks[child];
+
+ k = child;
+ }
+
+ deadlines[k] = deadline;
+ ids[k] = id;
+ tasks[k] = task;
+ }
+
+ /**
+ * Helper method to compare two task entries.
+ */
+ private static int compare(long leftDeadline, int leftId, long rightDeadline, int rightId) {
+ int cmp = Long.compare(leftDeadline, rightDeadline);
+ return cmp == 0 ? Integer.compare(leftId, rightId) : cmp;
+ }
+}