summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gradle/libs.versions.toml1
-rw-r--r--opendc-common/src/main/java/org/opendc/common/util/Pacer.java (renamed from opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt)80
-rw-r--r--opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java256
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt230
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt18
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt23
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt11
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt12
-rw-r--r--opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt3
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt6
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt11
-rw-r--r--opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt14
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt8
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt2
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt4
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt11
-rw-r--r--opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt12
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt5
-rw-r--r--opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt4
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt23
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt7
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt12
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt3
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt3
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt11
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt10
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt22
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt20
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt11
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt32
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt18
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java21
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt8
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt13
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt15
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt2
54 files changed, 518 insertions, 569 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 033e8cfb..68f3119f 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -36,6 +36,7 @@ kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core",
# Logging
kotlin-logging = { module = "io.github.microutils:kotlin-logging", version.ref = "kotlin-logging" }
+slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
log4j-core = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" }
log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version.ref = "log4j" }
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java
index b6141db1..5b8d8cb0 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt
+++ b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java
@@ -20,75 +20,75 @@
* SOFTWARE.
*/
-package org.opendc.common.util
+package org.opendc.common.util;
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import java.lang.Runnable
-import java.time.InstantSource
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
+import java.util.function.LongConsumer;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
/**
* Helper class to pace the incoming scheduling requests.
- *
- * @param context The [CoroutineContext] in which the pacer runs.
- * @param clock The virtual simulation clock.
- * @param quantum The scheduling quantum.
- * @param process The process to invoke for the incoming requests.
*/
-public class Pacer(
- private val context: CoroutineContext,
- private val clock: InstantSource,
- private val quantum: Long,
- private val process: (Long) -> Unit
-) {
+public final class Pacer {
+ private final Dispatcher dispatcher;
+ private final long quantumMs;
+ private final LongConsumer process;
+
/**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ * The current {@link DispatcherHandle} representing the pending scheduling cycle.
*/
- @OptIn(InternalCoroutinesApi::class)
- private val delay =
- requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+ private DispatcherHandle handle;
/**
- * The current [DisposableHandle] representing the pending scheduling cycle.
+ * Construct a {@link Pacer} instance.
+ *
+ * @param dispatcher The {@link Dispatcher} to schedule future invocations.
+ * @param quantumMs The scheduling quantum in milliseconds.
+ * @param process The process to invoke for the incoming requests.
*/
- private var handle: DisposableHandle? = null
+ public Pacer(Dispatcher dispatcher, long quantumMs, LongConsumer process) {
+ this.dispatcher = dispatcher;
+ this.quantumMs = quantumMs;
+ this.process = process;
+ }
/**
* Determine whether a scheduling cycle is pending.
*/
- public val isPending: Boolean get() = handle != null
+ public boolean isPending() {
+ return handle != null;
+ }
/**
* Enqueue a new scheduling cycle.
*/
- public fun enqueue() {
+ public void enqueue() {
if (handle != null) {
- return
+ return;
}
- val quantum = quantum
- val now = clock.millis()
+ final Dispatcher dispatcher = this.dispatcher;
+ long quantumMs = this.quantumMs;
+ long now = dispatcher.getTimeSource().millis();
// We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// We calculate here the delay until the next scheduling slot.
- val timeUntilNextSlot = quantum - (now % quantum)
+ long timeUntilNextSlot = quantumMs - (now % quantumMs);
- @OptIn(InternalCoroutinesApi::class)
- handle = delay.invokeOnTimeout(timeUntilNextSlot, {
- process(now + timeUntilNextSlot)
- handle = null
- }, context)
+ handle = dispatcher.scheduleCancellable(timeUntilNextSlot, () -> {
+ process.accept(now + timeUntilNextSlot);
+ handle = null;
+ });
}
/**
* Cancel the currently pending scheduling cycle.
*/
- public fun cancel() {
- val handle = handle ?: return
- this.handle = null
- handle.dispose()
+ public void cancel() {
+ final DispatcherHandle handle = this.handle;
+ if (handle != null) {
+ this.handle = null;
+ handle.cancel();
+ }
}
}
diff --git a/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java
new file mode 100644
index 00000000..a85605e9
--- /dev/null
+++ b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.common.util;
+
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.PriorityQueue;
+import org.jetbrains.annotations.NotNull;
+import org.opendc.common.Dispatcher;
+import org.opendc.common.DispatcherHandle;
+
+/**
+ * A {@link TimerScheduler} facilitates scheduled execution of future tasks.
+ */
+public final class TimerScheduler<T> {
+ private final Dispatcher dispatcher;
+
+ /**
+ * The stack of the invocations to occur in the future.
+ */
+ private final ArrayDeque<Invocation> invocations = new ArrayDeque<>();
+
+ /**
+ * A priority queue containing the tasks to be scheduled in the future.
+ */
+ private final PriorityQueue<Timer<T>> queue = new PriorityQueue<Timer<T>>();
+
+ /**
+ * A map that keeps track of the timers.
+ */
+ private final HashMap<T, Timer<T>> timers = new HashMap<>();
+
+ /**
+ * Construct a {@link TimerScheduler} instance.
+ *
+ * @param dispatcher The {@link Dispatcher} to schedule future invocations.
+ */
+ public TimerScheduler(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * Start a timer that will invoke the specified [block] after [delay].
+ * <p>
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param delay The delay before invoking the block.
+ * @param block The block to invoke.
+ */
+ public void startSingleTimer(T key, long delay, Runnable block) {
+ startSingleTimerTo(key, dispatcher.getTimeSource().millis() + delay, block);
+ }
+
+ /**
+ * Start a timer that will invoke the specified [block] at [timestamp].
+ * <p>
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param timestamp The timestamp at which to invoke the block.
+ * @param block The block to invoke.
+ */
+ public void startSingleTimerTo(T key, long timestamp, Runnable block) {
+ long now = dispatcher.getTimeSource().millis();
+ final PriorityQueue<Timer<T>> queue = this.queue;
+ final ArrayDeque<Invocation> invocations = this.invocations;
+
+ if (timestamp < now) {
+ throw new IllegalArgumentException("Timestamp must be in the future");
+ }
+
+ timers.compute(key, (k, old) -> {
+ if (old != null && old.timestamp == timestamp) {
+ // Fast-path: timer for the same timestamp already exists
+ old.block = block;
+ return old;
+ } else {
+ // Slow-path: cancel old timer and replace it with new timer
+ Timer<T> timer = new Timer<T>(key, timestamp, block);
+
+ if (old != null) {
+ old.isCancelled = true;
+ }
+ queue.add(timer);
+ trySchedule(now, invocations, timestamp);
+
+ return timer;
+ }
+ });
+ }
+
+ /**
+ * Check if a timer with a given key is active.
+ *
+ * @param key The key to check if active.
+ * @return `true` if the timer with the specified [key] is active, `false` otherwise.
+ */
+ public boolean isTimerActive(T key) {
+ return timers.containsKey(key);
+ }
+
+ /**
+ * Cancel a timer with a given key.
+ * <p>
+ * If canceling a timer that was already canceled, or key never was used to start
+ * a timer this operation will do nothing.
+ *
+ * @param key The key of the timer to cancel.
+ */
+ public void cancel(T key) {
+ final Timer<T> timer = timers.remove(key);
+
+ // Mark the timer as cancelled
+ if (timer != null) {
+ timer.isCancelled = true;
+ }
+ }
+
+ /**
+ * Cancel all timers.
+ */
+ public void cancelAll() {
+ queue.clear();
+ timers.clear();
+
+ // Cancel all pending invocations
+ for (final Invocation invocation : invocations) {
+ invocation.cancel();
+ }
+
+ invocations.clear();
+ }
+
+ /**
+ * Try to schedule an engine invocation at the specified [target].
+ *
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
+ */
+ private void trySchedule(long now, ArrayDeque<Invocation> scheduled, long target) {
+ final Invocation head = scheduled.peek();
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ final DispatcherHandle handle = dispatcher.scheduleCancellable(target - now, this::doRunTimers);
+ scheduled.addFirst(new Invocation(target, handle));
+ }
+ }
+
+ /**
+ * This method is invoked when the earliest timer expires.
+ */
+ private void doRunTimers() {
+ final ArrayDeque<Invocation> invocations = this.invocations;
+ final Invocation invocation = invocations.remove();
+
+ final PriorityQueue<Timer<T>> queue = this.queue;
+ final HashMap<T, Timer<T>> timers = this.timers;
+ long now = invocation.timestamp;
+
+ while (!queue.isEmpty()) {
+ final Timer<T> timer = queue.peek();
+
+ long timestamp = timer.timestamp;
+ boolean isCancelled = timer.isCancelled;
+
+ assert timestamp >= now : "Found task in the past";
+
+ if (timestamp > now && !isCancelled) {
+ // Schedule a task for the next event to occur.
+ trySchedule(now, invocations, timestamp);
+ break;
+ }
+
+ queue.poll();
+
+ if (!isCancelled) {
+ timers.remove(timer.key);
+ timer.run();
+ }
+ }
+ }
+
+ /**
+ * A task that is scheduled to run in the future.
+ */
+ private static class Timer<T> implements Comparable<Timer<T>> {
+ final T key;
+ final long timestamp;
+ Runnable block;
+
+ /**
+ * A flag to indicate that the task has been cancelled.
+ */
+ boolean isCancelled;
+
+ /**
+ * Construct a {@link Timer} instance.
+ */
+ public Timer(T key, long timestamp, Runnable block) {
+ this.key = key;
+ this.timestamp = timestamp;
+ this.block = block;
+ }
+
+ /**
+ * Run the task.
+ */
+ void run() {
+ block.run();
+ }
+
+ @Override
+ public int compareTo(@NotNull Timer<T> other) {
+ return Long.compare(timestamp, other.timestamp);
+ }
+ }
+
+ /**
+ * A future engine invocation.
+ * <p>
+ * This class is used to keep track of the future engine invocations created using the {@link Dispatcher} instance.
+ * In case the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private record Invocation(long timestamp, DispatcherHandle handle) {
+ /**
+ * Cancel the engine invocation.
+ */
+ void cancel() {
+ handle.cancel();
+ }
+ }
+}
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
deleted file mode 100644
index 864512d3..00000000
--- a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.common.util
-
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import java.time.InstantSource
-import java.util.ArrayDeque
-import java.util.PriorityQueue
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A TimerScheduler facilitates scheduled execution of future tasks.
- *
- * @param context The [CoroutineContext] to run the tasks with.
- * @param clock The clock to keep track of the time.
- */
-public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: InstantSource) {
- /**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
- */
- @OptIn(InternalCoroutinesApi::class)
- private val delay =
- requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
-
- /**
- * The stack of the invocations to occur in the future.
- */
- private val invocations = ArrayDeque<Invocation>()
-
- /**
- * A priority queue containing the tasks to be scheduled in the future.
- */
- private val queue = PriorityQueue<Timer<T>>()
-
- /**
- * A map that keeps track of the timers.
- */
- private val timers = mutableMapOf<T, Timer<T>>()
-
- /**
- * Start a timer that will invoke the specified [block] after [delay].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param delay The delay before invoking the block.
- * @param block The block to invoke.
- */
- public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
- startSingleTimerTo(key, clock.millis() + delay, block)
- }
-
- /**
- * Start a timer that will invoke the specified [block] at [timestamp].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param timestamp The timestamp at which to invoke the block.
- * @param block The block to invoke.
- */
- public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
- val now = clock.millis()
- val queue = queue
- val invocations = invocations
-
- require(timestamp >= now) { "Timestamp must be in the future" }
-
- timers.compute(key) { _, old ->
- if (old != null && old.timestamp == timestamp) {
- // Fast-path: timer for the same timestamp already exists
- old.block = block
- old
- } else {
- // Slow-path: cancel old timer and replace it with new timer
- val timer = Timer(key, timestamp, block)
-
- old?.isCancelled = true
- queue.add(timer)
- trySchedule(now, invocations, timestamp)
-
- timer
- }
- }
- }
-
- /**
- * Check if a timer with a given key is active.
- *
- * @param key The key to check if active.
- * @return `true` if the timer with the specified [key] is active, `false` otherwise.
- */
- public fun isTimerActive(key: T): Boolean = key in timers
-
- /**
- * Cancel a timer with a given key.
- *
- * If canceling a timer that was already canceled, or key never was used to start
- * a timer this operation will do nothing.
- *
- * @param key The key of the timer to cancel.
- */
- public fun cancel(key: T) {
- val timer = timers.remove(key)
-
- // Mark the timer as cancelled
- timer?.isCancelled = true
- }
-
- /**
- * Cancel all timers.
- */
- public fun cancelAll() {
- queue.clear()
- timers.clear()
-
- // Cancel all pending invocations
- for (invocation in invocations) {
- invocation.cancel()
- }
- invocations.clear()
- }
-
- /**
- * Try to schedule an engine invocation at the specified [target].
- *
- * @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the engine invocation should happen.
- * @param scheduled The queue of scheduled invocations.
- */
- private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- val head = scheduled.peek()
-
- // Only schedule a new scheduler invocation in case the target is earlier than all other pending
- // scheduler invocations
- if (head == null || target < head.timestamp) {
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context)
- scheduled.addFirst(Invocation(target, handle))
- }
- }
-
- /**
- * This method is invoked when the earliest timer expires.
- */
- private fun doRunTimers() {
- val invocations = invocations
- val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue
- val now = invocation.timestamp
-
- while (queue.isNotEmpty()) {
- val timer = queue.peek()
-
- val timestamp = timer.timestamp
- val isCancelled = timer.isCancelled
-
- assert(timestamp >= now) { "Found task in the past" }
-
- if (timestamp > now && !isCancelled) {
- // Schedule a task for the next event to occur.
- trySchedule(now, invocations, timestamp)
- break
- }
-
- queue.poll()
-
- if (!isCancelled) {
- timers.remove(timer.key)
- timer()
- }
- }
- }
-
- /**
- * A task that is scheduled to run in the future.
- */
- private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> {
- /**
- * A flag to indicate that the task has been cancelled.
- */
- @JvmField
- var isCancelled: Boolean = false
-
- /**
- * Run the task.
- */
- operator fun invoke(): Unit = block()
-
- override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp)
- }
-
- /**
- * A future engine invocation.
- *
- * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
- * the invocation is not needed anymore, it can be cancelled via [cancel].
- */
- private class Invocation(
- @JvmField val timestamp: Long,
- @JvmField val handle: DisposableHandle
- ) {
- /**
- * Cancel the engine invocation.
- */
- fun cancel() = handle.dispose()
- }
-}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
index 51e36eea..3235b046 100644
--- a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt
@@ -28,26 +28,18 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.kotlin.runSimulation
-import java.time.InstantSource
-import kotlin.coroutines.EmptyCoroutineContext
/**
* Test suite for the [Pacer] class.
*/
class PacerTest {
@Test
- fun testEmptyContext() {
- assertThrows<IllegalArgumentException> { Pacer(EmptyCoroutineContext, InstantSource.system(), 100) {} }
- }
-
- @Test
fun testSingleEnqueue() {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, timeSource, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -62,7 +54,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, timeSource, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -80,7 +72,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, timeSource, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -98,7 +90,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, timeSource, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
@@ -116,7 +108,7 @@ class PacerTest {
var count = 0
runSimulation {
- val pacer = Pacer(coroutineContext, timeSource, quantum = 100) {
+ val pacer = Pacer(dispatcher, /*quantum*/ 100) {
count++
}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
index e8ec97a4..3947fa2e 100644
--- a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
@@ -29,22 +29,15 @@ import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.kotlin.runSimulation
-import java.time.Clock
-import kotlin.coroutines.EmptyCoroutineContext
/**
* A test suite for the [TimerScheduler] class.
*/
-internal class TimerSchedulerTest {
- @Test
- fun testEmptyContext() {
- assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) }
- }
-
+class TimerSchedulerTest {
@Test
fun testBasicTimer() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, timeSource)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) {
assertEquals(1000, timeSource.millis())
@@ -58,7 +51,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelNonExisting() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, timeSource)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.cancel(1)
}
@@ -67,7 +60,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelExisting() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, timeSource)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) {
fail()
@@ -84,7 +77,7 @@ internal class TimerSchedulerTest {
@Test
fun testCancelAll() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, timeSource)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(1, 100) { fail() }
@@ -95,7 +88,7 @@ internal class TimerSchedulerTest {
@Test
fun testOverride() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, timeSource)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
@@ -108,7 +101,7 @@ internal class TimerSchedulerTest {
@Test
fun testOverrideBlock() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, timeSource)
+ val scheduler = TimerScheduler<Int>(dispatcher)
scheduler.startSingleTimer(0, 1000) { fail() }
@@ -121,7 +114,7 @@ internal class TimerSchedulerTest {
@Test
fun testNegativeDelay() {
runSimulation {
- val scheduler = TimerScheduler<Int>(coroutineContext, timeSource)
+ val scheduler = TimerScheduler<Int>(dispatcher)
assertThrows<IllegalArgumentException> {
scheduler.startSingleTimer(1, -1) {
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
index 4ced9569..9d7dcba6 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt
@@ -22,6 +22,7 @@
package org.opendc.compute.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Server
import org.opendc.compute.service.driver.Host
@@ -29,8 +30,6 @@ import org.opendc.compute.service.internal.ComputeServiceImpl
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.telemetry.SchedulerStats
import java.time.Duration
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* The [ComputeService] hosts the API implementation of the OpenDC Compute service.
@@ -80,18 +79,16 @@ public interface ComputeService : AutoCloseable {
/**
* Construct a new [ComputeService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] for scheduling future events.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
scheduler: ComputeScheduler,
schedulingQuantum: Duration = Duration.ofMinutes(5)
): ComputeService {
- return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum)
+ return ComputeServiceImpl(dispatcher, scheduler, schedulingQuantum)
}
}
}
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 2b755988..77932545 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.service.internal
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Flavor
@@ -37,25 +38,21 @@ import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.telemetry.SchedulerStats
import java.time.Duration
import java.time.Instant
-import java.time.InstantSource
import java.util.ArrayDeque
import java.util.Deque
import java.util.Random
import java.util.UUID
-import kotlin.coroutines.CoroutineContext
import kotlin.math.max
/**
* Internal implementation of the OpenDC Compute service.
*
- * @param coroutineContext The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] for scheduling future events.
* @param scheduler The scheduler implementation to use.
* @param schedulingQuantum The interval between scheduling cycles.
*/
internal class ComputeServiceImpl(
- coroutineContext: CoroutineContext,
- private val clock: InstantSource,
+ private val dispatcher: Dispatcher,
private val scheduler: ComputeScheduler,
schedulingQuantum: Duration
) : ComputeService, HostListener {
@@ -108,6 +105,7 @@ internal class ComputeServiceImpl(
override val hosts: Set<Host>
get() = hostToView.keys
+ private val clock = dispatcher.timeSource
private var maxCores = 0
private var maxMemory = 0L
private var _attemptsSuccess = 0L
@@ -120,7 +118,7 @@ internal class ComputeServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
override fun newClient(): ComputeClient {
check(!isClosed) { "Service is already closed" }
diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
index b790d36f..b5685aba 100644
--- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
+++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt
@@ -62,12 +62,11 @@ internal class ComputeServiceTest {
@BeforeEach
fun setUp() {
scope = SimulationCoroutineScope()
- val clock = scope.timeSource
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)),
weighers = listOf(RamWeigher())
)
- service = ComputeService(scope.coroutineContext, clock, computeScheduler)
+ service = ComputeService(scope.dispatcher, computeScheduler)
}
@Test
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index ac97552f..a496cc99 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -75,7 +75,7 @@ internal class SimHostTest {
fun testSingle() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -142,7 +142,7 @@ internal class SimHostTest {
fun testOvercommitted() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -229,7 +229,7 @@ internal class SimHostTest {
fun testFailure() = runSimulation {
val duration = 5 * 60L
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
index 66fcca22..eae5806e 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt
@@ -22,13 +22,12 @@
package org.opendc.experiments.provisioner
+import org.opendc.common.Dispatcher
import org.opendc.experiments.MutableServiceRegistry
import org.opendc.experiments.ServiceRegistry
import org.opendc.experiments.internal.ServiceRegistryImpl
-import java.time.InstantSource
import java.util.ArrayDeque
import java.util.SplittableRandom
-import kotlin.coroutines.CoroutineContext
/**
* A helper class to set up the experimental environment in a reproducible manner.
@@ -37,17 +36,15 @@ import kotlin.coroutines.CoroutineContext
* [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared
* down after the simulation completes.
*
- * @param coroutineContext The [CoroutineContext] in which the environment is set up.
- * @param clock The simulation clock represented as [InstantSource].
+ * @param dispatcher The [Dispatcher] implementation for scheduling future tasks.
* @param seed A seed for initializing the randomness of the environment.
*/
-public class Provisioner(coroutineContext: CoroutineContext, clock: InstantSource, seed: Long) : AutoCloseable {
+public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable {
/**
* Implementation of [ProvisioningContext].
*/
private val context = object : ProvisioningContext {
- override val clock: InstantSource = clock
- override val coroutineContext: CoroutineContext = coroutineContext
+ override val dispatcher: Dispatcher = dispatcher
override val seeder: SplittableRandom = SplittableRandom(seed)
override val registry: MutableServiceRegistry = ServiceRegistryImpl()
diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
index 7eec6fa4..e53044ce 100644
--- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
+++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt
@@ -22,27 +22,21 @@
package org.opendc.experiments.provisioner
+import org.opendc.common.Dispatcher
import org.opendc.experiments.MutableServiceRegistry
-import java.time.InstantSource
import java.util.SplittableRandom
import java.util.random.RandomGenerator
-import kotlin.coroutines.CoroutineContext
/**
* The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as
- * access to the simulation dispatcher (via [CoroutineContext]), the virtual clock, and a randomness seeder to allow
+ * access to the simulation dispatcher, the virtual clock, and a randomness seeder to allow
* the provisioning steps to initialize the (simulated) resources.
*/
public interface ProvisioningContext {
/**
- * The [CoroutineContext] in which the provisioner runs.
+ * The [Dispatcher] provided by the provisioner to schedule future events during the simulation.
*/
- public val coroutineContext: CoroutineContext
-
- /**
- * The [InstantSource] tracking the virtual simulation time.
- */
- public val clock: InstantSource
+ public val dispatcher: Dispatcher
/**
* A [SplittableRandom] instance used to seed the provisioners.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index 08bb2c32..1221f084 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -75,7 +75,7 @@ class CapelinBenchmarks {
fun benchmarkCapelin() = runSimulation {
val serviceDomain = "compute.opendc.org"
- Provisioner(coroutineContext, timeSource, seed = 0).use { provisioner ->
+ Provisioner(dispatcher, seed = 0).use { provisioner ->
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
index 1f9f3439..2567a4d5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt
@@ -64,7 +64,7 @@ public class CapelinRunner(
val serviceDomain = "compute.opendc.org"
val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt"))
- Provisioner(coroutineContext, timeSource, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }),
setupHosts(serviceDomain, topology, optimize = true)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index aa7d552e..7e01bb64 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -94,7 +94,7 @@ class CapelinIntegrationTest {
val topology = createTopology()
val monitor = monitor
- Provisioner(coroutineContext, timeSource, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -138,7 +138,7 @@ class CapelinIntegrationTest {
val topology = createTopology("single")
val monitor = monitor
- Provisioner(coroutineContext, timeSource, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -177,7 +177,7 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(1.0, seed)
val topology = createTopology("single")
- Provisioner(coroutineContext, timeSource, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
@@ -216,7 +216,7 @@ class CapelinIntegrationTest {
val workload = createTestWorkload(0.25, seed)
val monitor = monitor
- Provisioner(coroutineContext, timeSource, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }),
registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor),
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
index 38cbf2dc..d7347327 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt
@@ -41,7 +41,7 @@ public class ComputeServiceProvisioningStep internal constructor(
private val schedulingQuantum: Duration
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val service = ComputeService(ctx.coroutineContext, ctx.clock, scheduler(ctx), schedulingQuantum)
+ val service = ComputeService(ctx.dispatcher, scheduler(ctx), schedulingQuantum)
ctx.registry.register(serviceDomain, ComputeService::class.java, service)
return AutoCloseable { service.close() }
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
index e224fb84..310aa54c 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt
@@ -46,7 +46,7 @@ public class HostsProvisioningStep internal constructor(
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" }
- val engine = FlowEngine.create(ctx.coroutineContext, ctx.clock)
+ val engine = FlowEngine.create(ctx.dispatcher)
val graph = engine.newGraph()
val hosts = mutableSetOf<SimHost>()
@@ -58,7 +58,7 @@ public class HostsProvisioningStep internal constructor(
spec.uid,
spec.name,
spec.meta,
- ctx.clock,
+ ctx.dispatcher.timeSource,
machine,
hypervisor,
optimize = optimize
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
index f39f74bc..efd38a3c 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt
@@ -27,6 +27,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.compute.api.Server
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
@@ -37,26 +39,25 @@ import org.opendc.experiments.compute.telemetry.table.ServerTableReader
import org.opendc.experiments.compute.telemetry.table.ServiceTableReader
import java.time.Duration
import java.time.Instant
-import java.time.InstantSource
/**
* A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every
* export interval.
*
- * @param scope The [CoroutineScope] to run the reader in.
- * @param clock The virtual clock.
+ * @param dispatcher A [Dispatcher] for scheduling the future events.
* @param service The [ComputeService] to monitor.
* @param monitor The monitor to export the metrics to.
* @param exportInterval The export interval.
*/
public class ComputeMetricReader(
- scope: CoroutineScope,
- clock: InstantSource,
+ dispatcher: Dispatcher,
private val service: ComputeService,
private val monitor: ComputeMonitor,
private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher())
+ private val clock = dispatcher.timeSource
/**
* Aggregator for service metrics.
diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
index 68ca5ae8..665611dd 100644
--- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt
@@ -22,9 +22,6 @@
package org.opendc.experiments.compute.telemetry
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import org.opendc.compute.service.ComputeService
import org.opendc.experiments.provisioner.ProvisioningContext
import org.opendc.experiments.provisioner.ProvisioningStep
@@ -40,13 +37,8 @@ public class ComputeMonitorProvisioningStep internal constructor(
private val exportInterval: Duration
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val scope = CoroutineScope(ctx.coroutineContext + Job())
val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" }
- val metricReader = ComputeMetricReader(scope, ctx.clock, service, monitor, exportInterval)
-
- return AutoCloseable {
- metricReader.close()
- scope.cancel()
- }
+ val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval)
+ return AutoCloseable { metricReader.close() }
}
}
diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
index 3b4200c8..e5c2f86a 100644
--- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt
@@ -56,10 +56,9 @@ public class FaaSServiceProvisioningStep internal constructor(
} else {
ZeroDelayInjector
}
- val deployer = SimFunctionDeployer(ctx.coroutineContext, ctx.clock, machineModel, delayInjector)
+ val deployer = SimFunctionDeployer(ctx.dispatcher, machineModel, delayInjector)
val service = FaaSService(
- ctx.coroutineContext,
- ctx.clock,
+ ctx.dispatcher,
deployer,
routingPolicy(ctx),
terminationPolicy(ctx)
diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
index ff825260..4a4d9ae0 100644
--- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
+++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt
@@ -49,12 +49,12 @@ class FaaSExperiment {
fun testSmoke() = runSimulation {
val faasService = "faas.opendc.org"
- Provisioner(coroutineContext, timeSource, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
provisioner.runStep(
setupFaaSService(
faasService,
{ RandomRoutingPolicy() },
- { FunctionTerminationPolicyFixed(it.coroutineContext, it.clock, timeout = Duration.ofMinutes(10)) },
+ { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) },
createMachineModel(),
coldStartModel = ColdStartModel.GOOGLE
)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 51f6e763..53bf5aa6 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -22,12 +22,9 @@
package org.opendc.experiments.tf20.core
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineContext
@@ -36,17 +33,14 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.CpuPowerModel
-import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow2.FlowEngine
import org.opendc.simulator.flow2.FlowStage
import org.opendc.simulator.flow2.FlowStageLogic
import org.opendc.simulator.flow2.OutPort
-import java.time.InstantSource
import java.util.ArrayDeque
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.math.ceil
import kotlin.math.roundToLong
@@ -57,22 +51,16 @@ import kotlin.math.roundToLong
public class SimTFDevice(
override val uid: UUID,
override val isGpu: Boolean,
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
pu: ProcessingUnit,
private val memory: MemoryUnit,
powerModel: CpuPowerModel
) : TFDevice {
/**
- * The scope in which the device runs.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The [SimMachine] representing the device.
*/
private val machine = SimBareMetalMachine.create(
- FlowEngine.create(context, clock).newGraph(),
+ FlowEngine.create(dispatcher).newGraph(),
MachineModel(listOf(pu), listOf(memory)),
SimPsuFactories.simple(powerModel)
)
@@ -162,9 +150,7 @@ public class SimTFDevice(
}
init {
- scope.launch {
- machine.runWorkload(workload)
- }
+ machine.startWorkload(workload, emptyMap()) {}
}
override suspend fun load(dataSize: Long) {
@@ -185,7 +171,6 @@ public class SimTFDevice(
override fun close() {
machine.cancel()
- scope.cancel()
}
private data class Work(var flops: Double, val cont: Continuation<Unit>) {
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
index 6fcdf513..5b408fb3 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
@@ -23,19 +23,18 @@
package org.opendc.experiments.tf20.network
import kotlinx.coroutines.channels.Channel
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* The network controller represents a simple network model between the worker and master nodes during
* TensorFlow execution.
*/
-public class NetworkController(context: CoroutineContext, clock: InstantSource) : AutoCloseable {
+public class NetworkController(dispatcher: Dispatcher) : AutoCloseable {
/**
* The scheduler for the message.
*/
- private val scheduler = TimerScheduler<Message>(context, clock)
+ private val scheduler = TimerScheduler<Message>(dispatcher)
/**
* The outbound communication channels.
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
index d01a4a3c..899aafc0 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
@@ -48,8 +48,7 @@ class TensorFlowTest {
val device = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- timeSource,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -83,8 +82,7 @@ class TensorFlowTest {
val device = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- timeSource,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -118,8 +116,7 @@ class TensorFlowTest {
val deviceA = SimTFDevice(
def.uid,
def.meta["gpu"] as Boolean,
- coroutineContext,
- timeSource,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
@@ -128,8 +125,7 @@ class TensorFlowTest {
val deviceB = SimTFDevice(
UUID.randomUUID(),
def.meta["gpu"] as Boolean,
- coroutineContext,
- timeSource,
+ dispatcher,
def.model.cpus[0],
def.model.memory[0],
CpuPowerModels.linear(250.0, 60.0)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
index 9f15eab6..549c6f3e 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
@@ -47,8 +47,7 @@ internal class SimTFDeviceTest {
val device = SimTFDevice(
UUID.randomUUID(),
isGpu = true,
- coroutineContext,
- timeSource,
+ dispatcher,
pu,
memory,
CpuPowerModels.linear(250.0, 100.0)
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
index 5cee9abf..fe4fde17 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
@@ -47,8 +47,7 @@ public class WorkflowServiceProvisioningStep internal constructor(
val client = computeService.newClient()
val service = WorkflowService(
- ctx.coroutineContext,
- ctx.clock,
+ ctx.dispatcher,
client,
scheduler.schedulingQuantum,
jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 53706c57..96619cdb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -22,6 +22,7 @@
package org.opendc.faas.service
+import org.opendc.common.Dispatcher
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
@@ -31,8 +32,6 @@ import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
import java.time.Duration
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* The [FaaSService] hosts the service implementation of the OpenDC FaaS platform.
@@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable {
/**
* Construct a new [FaaSService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
+ * @param dispatcher The [Dispatcher] used for scheduling events.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
* @param quantum The scheduling quantum of the service (100 ms default)
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
quantum: Duration = Duration.ofMillis(100)
): FaaSService {
- return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum)
+ return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index f494adb1..a2c371e1 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -22,12 +22,11 @@
package org.opendc.faas.service.autoscaler
+import org.opendc.common.Dispatcher
import org.opendc.common.util.TimerScheduler
import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
import java.time.Duration
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time.
@@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext
* @param timeout The idle timeout after which the function instance is terminated.
*/
public class FunctionTerminationPolicyFixed(
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
*/
- private val scheduler = TimerScheduler<FunctionInstance>(context, clock)
+ private val scheduler = TimerScheduler<FunctionInstance>(dispatcher)
override fun enqueue(instance: FunctionInstance) {
// Cancel the existing timeout timer
@@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
+ scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 3235ff1a..b1e6b3f5 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -22,13 +22,11 @@
package org.opendc.faas.service.internal
-import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import kotlinx.coroutines.suspendCancellableCoroutine
import mu.KotlinLogging
+import org.opendc.common.Dispatcher
import org.opendc.common.util.Pacer
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
@@ -49,7 +47,6 @@ import java.util.ArrayDeque
import java.util.Random
import java.util.UUID
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resumeWithException
/**
@@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException
* this component queues the events to await the deployment of new instances.
*/
internal class FaaSServiceImpl(
- context: CoroutineContext,
- private val clock: InstantSource,
+ dispatcher: Dispatcher,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy,
quantum: Duration
) : FaaSService, FunctionInstanceListener {
/**
- * The [CoroutineScope] of the service bounded by the lifecycle of the service.
- */
- private val scope = CoroutineScope(context + Job())
-
- /**
* The logger instance of this server.
*/
private val logger = KotlinLogging.logger {}
@@ -80,7 +71,12 @@ internal class FaaSServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() }
+
+ /**
+ * The [InstantSource] instance representing the clock.
+ */
+ private val clock = dispatcher.timeSource
/**
* The [Random] instance used to generate unique identifiers for the objects.
@@ -266,8 +262,6 @@ internal class FaaSServiceImpl(
}
override fun close() {
- scope.cancel()
-
// Stop all function instances
for ((_, function) in functions) {
function.close()
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index e29864da..9676744b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,7 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -58,7 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -67,7 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -78,7 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -91,7 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -104,7 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -117,7 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -128,7 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -142,7 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runSimulation {
- val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk())
+ val service = FaaSService(dispatcher, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -155,7 +155,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runSimulation {
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, timeSource, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 9ec26d5d..47b4d4fa 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -31,6 +31,8 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.faas.service.FunctionObject
import org.opendc.faas.service.deployer.FunctionDeployer
import org.opendc.faas.service.deployer.FunctionInstance
@@ -44,10 +46,8 @@ import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.flow2.FlowEngine
-import java.time.InstantSource
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
@@ -55,8 +55,7 @@ import kotlin.coroutines.resumeWithException
* A [FunctionDeployer] that uses that simulates the [FunctionInstance]s.
*/
public class SimFunctionDeployer(
- context: CoroutineContext,
- private val clock: InstantSource,
+ private val dispatcher: Dispatcher,
private val model: MachineModel,
private val delayInjector: DelayInjector,
private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper()
@@ -64,7 +63,7 @@ public class SimFunctionDeployer(
/**
* The [CoroutineScope] of this deployer.
*/
- private val scope = CoroutineScope(context + Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + Job())
override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance {
val instance = Instance(function, listener)
@@ -86,7 +85,7 @@ public class SimFunctionDeployer(
* The machine that will execute the workloads.
*/
public val machine: SimMachine = SimBareMetalMachine.create(
- FlowEngine.create(scope.coroutineContext, clock).newGraph(),
+ FlowEngine.create(dispatcher).newGraph(),
model
)
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index e51c3019..be133ded 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -73,13 +73,12 @@ internal class SimFaaSServiceTest {
})
val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random)
- val deployer = SimFunctionDeployer(coroutineContext, timeSource, machineModel, delayInjector) { workload }
+ val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload }
val service = FaaSService(
- coroutineContext,
- timeSource,
+ dispatcher,
deployer,
RandomRoutingPolicy(),
- FunctionTerminationPolicyFixed(coroutineContext, timeSource, timeout = Duration.ofMillis(10000))
+ FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000))
)
val client = service.newClient()
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index b761598b..eea46b95 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -74,7 +74,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkBareMetal() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
return@runSimulation machine.runWorkload(trace.createWorkload(0))
@@ -84,7 +84,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkSpaceSharedHypervisor() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1))
@@ -105,7 +105,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkFairShareHypervisorSingle() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
@@ -126,7 +126,7 @@ class SimMachineBenchmarks {
@Benchmark
fun benchmarkFairShareHypervisorDouble() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1))
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 646d687d..58b01e06 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -72,7 +72,7 @@ class SimMachineTest {
@Test
fun testFlopsWorkload() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -97,7 +97,7 @@ class SimMachineTest {
}
val trace = builder.build()
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
graph,
@@ -112,7 +112,7 @@ class SimMachineTest {
@Test
fun testDualSocketMachine() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val cpuNode = machineModel.cpus[0].node
@@ -133,7 +133,7 @@ class SimMachineTest {
@Test
fun testPower() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
graph,
@@ -156,7 +156,7 @@ class SimMachineTest {
@Test
fun testCapacityClamp() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -184,7 +184,7 @@ class SimMachineTest {
@Test
fun testMemory() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -206,7 +206,7 @@ class SimMachineTest {
@Test
fun testMemoryUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -230,7 +230,7 @@ class SimMachineTest {
@Test
fun testNetUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -258,7 +258,7 @@ class SimMachineTest {
@Test
fun testDiskReadUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -283,7 +283,7 @@ class SimMachineTest {
@Test
fun testDiskWriteUsage() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -308,7 +308,7 @@ class SimMachineTest {
@Test
fun testCancellation() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -330,7 +330,7 @@ class SimMachineTest {
@Test
fun testConcurrentRuns() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -351,7 +351,7 @@ class SimMachineTest {
@Test
fun testCatchStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -367,7 +367,7 @@ class SimMachineTest {
@Test
fun testCatchStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -384,7 +384,7 @@ class SimMachineTest {
@Test
fun testCatchShutdownFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -400,7 +400,7 @@ class SimMachineTest {
@Test
fun testCatchNestedFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index f60ff67c..99f47b2f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -74,7 +74,7 @@ internal class SimFairShareHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -118,7 +118,7 @@ internal class SimFairShareHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -157,7 +157,7 @@ internal class SimFairShareHypervisorTest {
/*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
@@ -184,7 +184,7 @@ internal class SimFairShareHypervisorTest {
.addGroup(setOf("a", "n"), 0.1, 0.8)
.build()
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, model)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 31718794..93b67aa3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -75,7 +75,7 @@ internal class SimSpaceSharedHypervisorTest {
SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
).createWorkload(0)
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -99,7 +99,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testRuntimeWorkload() = runSimulation {
val duration = 5 * 60L * 1000
val workload = SimWorkloads.runtime(duration, 1.0)
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -123,7 +123,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testFlopsWorkload() = runSimulation {
val duration = 5 * 60L * 1000
val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0)
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -144,7 +144,7 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testTwoWorkloads() = runSimulation {
val duration = 5 * 60L * 1000
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -173,7 +173,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadFails() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
@@ -200,7 +200,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
index c208a2af..08bb6509 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
@@ -59,7 +59,7 @@ class SimChainWorkloadTest {
@Test
fun testMultipleWorkloads() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -80,7 +80,7 @@ class SimChainWorkloadTest {
@Test
fun testStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -105,7 +105,7 @@ class SimChainWorkloadTest {
@Test
fun testStartFailureSecond() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -131,7 +131,7 @@ class SimChainWorkloadTest {
@Test
fun testStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -155,7 +155,7 @@ class SimChainWorkloadTest {
@Test
fun testStopFailureSecond() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -180,7 +180,7 @@ class SimChainWorkloadTest {
@Test
fun testStartAndStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -206,7 +206,7 @@ class SimChainWorkloadTest {
@Test
fun testShutdownAndStopFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -232,7 +232,7 @@ class SimChainWorkloadTest {
@Test
fun testShutdownAndStartFailure() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -260,7 +260,7 @@ class SimChainWorkloadTest {
@Test
fun testSnapshot() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(graph, machineModel)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index c0bdfd25..5c888fbc 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -53,7 +53,7 @@ class SimTraceWorkloadTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -76,7 +76,7 @@ class SimTraceWorkloadTest {
@Test
fun testOffset() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -99,7 +99,7 @@ class SimTraceWorkloadTest {
@Test
fun testSkipFragment() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
@@ -123,7 +123,7 @@ class SimTraceWorkloadTest {
@Test
fun testZeroCores() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val machine = SimBareMetalMachine.create(
diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
index 04d46607..4f04bdc1 100644
--- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -28,8 +28,8 @@ plugins {
}
dependencies {
- api(libs.kotlinx.coroutines)
- implementation(libs.kotlin.logging)
+ api(projects.opendc.opendcCommon)
+ implementation(libs.slf4j.api)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(libs.slf4j.simple)
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
index 5a67c7d2..59dd3bad 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
@@ -60,7 +60,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkSink() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -71,7 +71,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkForward() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -85,7 +85,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinSingleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
@@ -103,7 +103,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinTripleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
index cfa5a48f..c0f52505 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
@@ -26,9 +26,8 @@ import java.time.Clock;
import java.time.InstantSource;
import java.util.ArrayList;
import java.util.List;
-import kotlin.coroutines.ContinuationInterceptor;
import kotlin.coroutines.CoroutineContext;
-import kotlinx.coroutines.Delay;
+import org.opendc.common.Dispatcher;
/**
* A {@link FlowEngine} simulates a generic flow network.
@@ -57,23 +56,19 @@ public final class FlowEngine implements Runnable {
*/
private boolean active;
- private final CoroutineContext coroutineContext;
+ private final Dispatcher dispatcher;
private final InstantSource clock;
- private final Delay delay;
/**
* Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}.
*/
- public static FlowEngine create(CoroutineContext coroutineContext, InstantSource clock) {
- return new FlowEngine(coroutineContext, clock);
+ public static FlowEngine create(Dispatcher dispatcher) {
+ return new FlowEngine(dispatcher);
}
- FlowEngine(CoroutineContext coroutineContext, InstantSource clock) {
- this.coroutineContext = coroutineContext;
- this.clock = clock;
-
- CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key;
- this.delay = (Delay) coroutineContext.get(key);
+ FlowEngine(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ this.clock = dispatcher.getTimeSource();
}
/**
@@ -205,7 +200,7 @@ public final class FlowEngine implements Runnable {
// Only schedule a new scheduler invocation in case the target is earlier than all other pending
// scheduler invocations
if (scheduled.tryAdd(target)) {
- delay.invokeOnTimeout(target - now, this, coroutineContext);
+ dispatcher.schedule(target - now, this);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
index b5054375..467bf334 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
@@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation
class FlowEngineTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val multiplexer = MaxMinFlowMultiplexer(graph)
@@ -55,7 +55,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -65,7 +65,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -75,7 +75,7 @@ class FlowEngineTest {
@Test
fun testConnectInletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -87,7 +87,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -99,7 +99,7 @@ class FlowEngineTest {
@Test
fun testConnectInletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 2.0f)
@@ -112,7 +112,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -125,7 +125,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -134,7 +134,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -143,7 +143,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -154,7 +154,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -165,7 +165,7 @@ class FlowEngineTest {
@Test
fun testInletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -181,7 +181,7 @@ class FlowEngineTest {
@Test
fun testOutletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
index d7a2190f..fef49786 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
@@ -39,7 +39,7 @@ class ForwardingFlowMultiplexerTest {
*/
@Test
fun testTrace() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = ForwardingFlowMultiplexer(graph)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
index 635b1d98..ebae2d4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
class MaxMinFlowMultiplexerTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
index d50a40b0..ea516c63 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
@@ -37,7 +37,7 @@ import java.util.concurrent.ThreadLocalRandom
class FlowSinkTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -51,7 +51,7 @@ class FlowSinkTest {
@Test
fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -69,7 +69,7 @@ class FlowSinkTest {
@Test
fun testUtilization() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -83,7 +83,7 @@ class FlowSinkTest {
@Test
fun testFragments() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -114,7 +114,7 @@ class FlowSinkTest {
)
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index c1a558b8..181d9a20 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -43,7 +43,7 @@ import org.opendc.simulator.kotlin.runSimulation
class SimNetworkSinkTest {
@Test
fun testInitialState() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -56,7 +56,7 @@ class SimNetworkSinkTest {
@Test
fun testDisconnectIdempotent() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -66,7 +66,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectCircular() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
@@ -77,7 +77,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnectedTarget() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = mockk<SimNetworkPort>(relaxUnitFun = true)
@@ -90,7 +90,7 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source1 = TestSource(graph)
@@ -107,7 +107,7 @@ class SimNetworkSinkTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
@@ -127,7 +127,7 @@ class SimNetworkSinkTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
index e45b1bd7..4a489478 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
@@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation
class SimNetworkSwitchVirtualTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val source = TestSource(graph)
@@ -60,7 +60,7 @@ class SimNetworkSwitchVirtualTest {
@Test
fun testConnectClosedPort() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimNetworkSink(graph, /*capacity*/ 100.0f)
val switch = SimNetworkSwitchVirtual(graph)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index 2e0dc5c4..f596ca4e 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimPduTest {
@Test
fun testZeroOutlets() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
@@ -48,7 +48,7 @@ internal class SimPduTest {
@Test
fun testSingleOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
@@ -62,7 +62,7 @@ internal class SimPduTest {
@Test
fun testDoubleOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 200.0f)
val pdu = SimPdu(graph)
@@ -78,7 +78,7 @@ internal class SimPduTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 300.0f)
val pdu = SimPdu(graph)
@@ -95,7 +95,7 @@ internal class SimPduTest {
@Test
fun testLoss() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 500.0f)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
@@ -110,7 +110,7 @@ internal class SimPduTest {
@Test
fun testOutletClose() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val pdu = SimPdu(graph)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index 0f145592..03c942b4 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -42,7 +42,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimPowerSourceTest {
@Test
fun testInitialState() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -57,7 +57,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectIdempotent() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -67,7 +67,7 @@ internal class SimPowerSourceTest {
@Test
fun testConnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -87,7 +87,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -102,7 +102,7 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectAssertion() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
@@ -120,7 +120,7 @@ internal class SimPowerSourceTest {
@Test
fun testOutletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = TestInlet(graph)
@@ -135,7 +135,7 @@ internal class SimPowerSourceTest {
@Test
fun testInletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 100.0f)
val inlet = mockk<SimPowerInlet>(relaxUnitFun = true)
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index 4ce83fe9..89fede63 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
internal class SimUpsTest {
@Test
fun testSingleInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 200.0f)
val ups = SimUps(graph)
@@ -49,7 +49,7 @@ internal class SimUpsTest {
@Test
fun testDoubleInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source1 = SimPowerSource(graph, /*capacity*/ 200.0f)
val source2 = SimPowerSource(graph, /*capacity*/ 200.0f)
@@ -69,7 +69,7 @@ internal class SimUpsTest {
@Test
fun testLoss() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source = SimPowerSource(graph, /*capacity*/ 500.0f)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
@@ -84,7 +84,7 @@ internal class SimUpsTest {
@Test
fun testDisconnect() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, timeSource)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val source1 = SimPowerSource(graph, /*capacity*/ 200.0f)
val source2 = SimPowerSource(graph, /*capacity*/ 200.0f)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 4c6fe755..86c1c521 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -260,7 +260,7 @@ public class OpenDCRunner(
val scenario = scenario
- Provisioner(coroutineContext, timeSource, seed).use { provisioner ->
+ Provisioner(dispatcher, seed).use { provisioner ->
provisioner.runSteps(
setupComputeService(
serviceDomain,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index f0e86449..07b43b6d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,6 +22,7 @@
package org.opendc.workflow.service
+import org.opendc.common.Dispatcher
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -31,8 +32,6 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
import java.time.Duration
-import java.time.InstantSource
-import kotlin.coroutines.CoroutineContext
/**
* A service for cloud workflow execution.
@@ -59,9 +58,7 @@ public interface WorkflowService : AutoCloseable {
/**
* Construct a new [WorkflowService] implementation.
*
- * @param context The [CoroutineContext] to use in the service.
- * @param clock The clock instance to use.
- * @param meterProvider The meter provider to use.
+ * @param dispatcher A [Dispatcher] to schedule future events.
* @param compute The "Compute" client to use.
* @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
@@ -70,8 +67,7 @@ public interface WorkflowService : AutoCloseable {
* @param taskOrderPolicy The task order policy to use.
*/
public operator fun invoke(
- context: CoroutineContext,
- clock: InstantSource,
+ dispatcher: Dispatcher,
compute: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -80,8 +76,7 @@ public interface WorkflowService : AutoCloseable {
taskOrderPolicy: TaskOrderPolicy
): WorkflowService {
return WorkflowServiceImpl(
- context,
- clock,
+ dispatcher,
compute,
schedulingQuantum,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 20e30fd4..01c1f565 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -26,6 +26,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.common.Dispatcher
+import org.opendc.common.asCoroutineDispatcher
import org.opendc.common.util.Pacer
import org.opendc.compute.api.ComputeClient
import org.opendc.compute.api.Image
@@ -44,7 +46,6 @@ import java.time.Duration
import java.time.InstantSource
import java.util.PriorityQueue
import java.util.Queue
-import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
/**
@@ -52,8 +53,7 @@ import kotlin.coroutines.resume
* Datacenter Scheduling.
*/
public class WorkflowServiceImpl(
- context: CoroutineContext,
- private val clock: InstantSource,
+ dispatcher: Dispatcher,
private val computeClient: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -64,7 +64,12 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- private val scope = CoroutineScope(context + kotlinx.coroutines.Job())
+ private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + kotlinx.coroutines.Job())
+
+ /**
+ * The [InstantSource] representing the clock of this service.
+ */
+ private val clock = dispatcher.timeSource
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -149,7 +154,7 @@ public class WorkflowServiceImpl(
/**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() }
+ private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() }
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index f5edbb2f..e5e05a92 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -70,7 +70,7 @@ internal class WorkflowServiceTest {
val computeService = "compute.opendc.org"
val workflowService = "workflow.opendc.org"
- Provisioner(coroutineContext, timeSource, seed = 0L).use { provisioner ->
+ Provisioner(dispatcher, seed = 0L).use { provisioner ->
val scheduler: (ProvisioningContext) -> ComputeScheduler = {
FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),