diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-11-09 21:59:07 +0000 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-11-13 17:42:01 +0000 |
| commit | fb2672afb2d8236d5291cd028196c99d8e4d47f1 (patch) | |
| tree | 508bbec117239b3d8490cd1bde8d12b6a8ab2155 | |
| parent | 00ac59e8e9d6a41c2eac55aa25420dce8fa9c6e0 (diff) | |
refactor: Replace use of CoroutineContext by Dispatcher
This change replaces the use of `CoroutineContext` for passing the
`SimulationDispatcher` across the different modules of OpenDC by the
lightweight `Dispatcher` interface of the OpenDC common module.
54 files changed, 518 insertions, 569 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 033e8cfb..68f3119f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -36,6 +36,7 @@ kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", # Logging kotlin-logging = { module = "io.github.microutils:kotlin-logging", version.ref = "kotlin-logging" } +slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } log4j-core = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" } log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version.ref = "log4j" } diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java index b6141db1..5b8d8cb0 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/util/Pacer.kt +++ b/opendc-common/src/main/java/org/opendc/common/util/Pacer.java @@ -20,75 +20,75 @@ * SOFTWARE. */ -package org.opendc.common.util +package org.opendc.common.util; -import kotlinx.coroutines.Delay -import kotlinx.coroutines.DisposableHandle -import kotlinx.coroutines.InternalCoroutinesApi -import java.lang.Runnable -import java.time.InstantSource -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext +import java.util.function.LongConsumer; +import org.opendc.common.Dispatcher; +import org.opendc.common.DispatcherHandle; /** * Helper class to pace the incoming scheduling requests. - * - * @param context The [CoroutineContext] in which the pacer runs. - * @param clock The virtual simulation clock. - * @param quantum The scheduling quantum. - * @param process The process to invoke for the incoming requests. */ -public class Pacer( - private val context: CoroutineContext, - private val clock: InstantSource, - private val quantum: Long, - private val process: (Long) -> Unit -) { +public final class Pacer { + private final Dispatcher dispatcher; + private final long quantumMs; + private final LongConsumer process; + /** - * The [Delay] instance that provides scheduled execution of [Runnable]s. + * The current {@link DispatcherHandle} representing the pending scheduling cycle. */ - @OptIn(InternalCoroutinesApi::class) - private val delay = - requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + private DispatcherHandle handle; /** - * The current [DisposableHandle] representing the pending scheduling cycle. + * Construct a {@link Pacer} instance. + * + * @param dispatcher The {@link Dispatcher} to schedule future invocations. + * @param quantumMs The scheduling quantum in milliseconds. + * @param process The process to invoke for the incoming requests. */ - private var handle: DisposableHandle? = null + public Pacer(Dispatcher dispatcher, long quantumMs, LongConsumer process) { + this.dispatcher = dispatcher; + this.quantumMs = quantumMs; + this.process = process; + } /** * Determine whether a scheduling cycle is pending. */ - public val isPending: Boolean get() = handle != null + public boolean isPending() { + return handle != null; + } /** * Enqueue a new scheduling cycle. */ - public fun enqueue() { + public void enqueue() { if (handle != null) { - return + return; } - val quantum = quantum - val now = clock.millis() + final Dispatcher dispatcher = this.dispatcher; + long quantumMs = this.quantumMs; + long now = dispatcher.getTimeSource().millis(); // We assume that the scheduler runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // We calculate here the delay until the next scheduling slot. - val timeUntilNextSlot = quantum - (now % quantum) + long timeUntilNextSlot = quantumMs - (now % quantumMs); - @OptIn(InternalCoroutinesApi::class) - handle = delay.invokeOnTimeout(timeUntilNextSlot, { - process(now + timeUntilNextSlot) - handle = null - }, context) + handle = dispatcher.scheduleCancellable(timeUntilNextSlot, () -> { + process.accept(now + timeUntilNextSlot); + handle = null; + }); } /** * Cancel the currently pending scheduling cycle. */ - public fun cancel() { - val handle = handle ?: return - this.handle = null - handle.dispose() + public void cancel() { + final DispatcherHandle handle = this.handle; + if (handle != null) { + this.handle = null; + handle.cancel(); + } } } diff --git a/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java new file mode 100644 index 00000000..a85605e9 --- /dev/null +++ b/opendc-common/src/main/java/org/opendc/common/util/TimerScheduler.java @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.common.util; + +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.PriorityQueue; +import org.jetbrains.annotations.NotNull; +import org.opendc.common.Dispatcher; +import org.opendc.common.DispatcherHandle; + +/** + * A {@link TimerScheduler} facilitates scheduled execution of future tasks. + */ +public final class TimerScheduler<T> { + private final Dispatcher dispatcher; + + /** + * The stack of the invocations to occur in the future. + */ + private final ArrayDeque<Invocation> invocations = new ArrayDeque<>(); + + /** + * A priority queue containing the tasks to be scheduled in the future. + */ + private final PriorityQueue<Timer<T>> queue = new PriorityQueue<Timer<T>>(); + + /** + * A map that keeps track of the timers. + */ + private final HashMap<T, Timer<T>> timers = new HashMap<>(); + + /** + * Construct a {@link TimerScheduler} instance. + * + * @param dispatcher The {@link Dispatcher} to schedule future invocations. + */ + public TimerScheduler(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + /** + * Start a timer that will invoke the specified [block] after [delay]. + * <p> + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param delay The delay before invoking the block. + * @param block The block to invoke. + */ + public void startSingleTimer(T key, long delay, Runnable block) { + startSingleTimerTo(key, dispatcher.getTimeSource().millis() + delay, block); + } + + /** + * Start a timer that will invoke the specified [block] at [timestamp]. + * <p> + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param timestamp The timestamp at which to invoke the block. + * @param block The block to invoke. + */ + public void startSingleTimerTo(T key, long timestamp, Runnable block) { + long now = dispatcher.getTimeSource().millis(); + final PriorityQueue<Timer<T>> queue = this.queue; + final ArrayDeque<Invocation> invocations = this.invocations; + + if (timestamp < now) { + throw new IllegalArgumentException("Timestamp must be in the future"); + } + + timers.compute(key, (k, old) -> { + if (old != null && old.timestamp == timestamp) { + // Fast-path: timer for the same timestamp already exists + old.block = block; + return old; + } else { + // Slow-path: cancel old timer and replace it with new timer + Timer<T> timer = new Timer<T>(key, timestamp, block); + + if (old != null) { + old.isCancelled = true; + } + queue.add(timer); + trySchedule(now, invocations, timestamp); + + return timer; + } + }); + } + + /** + * Check if a timer with a given key is active. + * + * @param key The key to check if active. + * @return `true` if the timer with the specified [key] is active, `false` otherwise. + */ + public boolean isTimerActive(T key) { + return timers.containsKey(key); + } + + /** + * Cancel a timer with a given key. + * <p> + * If canceling a timer that was already canceled, or key never was used to start + * a timer this operation will do nothing. + * + * @param key The key of the timer to cancel. + */ + public void cancel(T key) { + final Timer<T> timer = timers.remove(key); + + // Mark the timer as cancelled + if (timer != null) { + timer.isCancelled = true; + } + } + + /** + * Cancel all timers. + */ + public void cancelAll() { + queue.clear(); + timers.clear(); + + // Cancel all pending invocations + for (final Invocation invocation : invocations) { + invocation.cancel(); + } + + invocations.clear(); + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + * @param scheduled The queue of scheduled invocations. + */ + private void trySchedule(long now, ArrayDeque<Invocation> scheduled, long target) { + final Invocation head = scheduled.peek(); + + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (head == null || target < head.timestamp) { + final DispatcherHandle handle = dispatcher.scheduleCancellable(target - now, this::doRunTimers); + scheduled.addFirst(new Invocation(target, handle)); + } + } + + /** + * This method is invoked when the earliest timer expires. + */ + private void doRunTimers() { + final ArrayDeque<Invocation> invocations = this.invocations; + final Invocation invocation = invocations.remove(); + + final PriorityQueue<Timer<T>> queue = this.queue; + final HashMap<T, Timer<T>> timers = this.timers; + long now = invocation.timestamp; + + while (!queue.isEmpty()) { + final Timer<T> timer = queue.peek(); + + long timestamp = timer.timestamp; + boolean isCancelled = timer.isCancelled; + + assert timestamp >= now : "Found task in the past"; + + if (timestamp > now && !isCancelled) { + // Schedule a task for the next event to occur. + trySchedule(now, invocations, timestamp); + break; + } + + queue.poll(); + + if (!isCancelled) { + timers.remove(timer.key); + timer.run(); + } + } + } + + /** + * A task that is scheduled to run in the future. + */ + private static class Timer<T> implements Comparable<Timer<T>> { + final T key; + final long timestamp; + Runnable block; + + /** + * A flag to indicate that the task has been cancelled. + */ + boolean isCancelled; + + /** + * Construct a {@link Timer} instance. + */ + public Timer(T key, long timestamp, Runnable block) { + this.key = key; + this.timestamp = timestamp; + this.block = block; + } + + /** + * Run the task. + */ + void run() { + block.run(); + } + + @Override + public int compareTo(@NotNull Timer<T> other) { + return Long.compare(timestamp, other.timestamp); + } + } + + /** + * A future engine invocation. + * <p> + * This class is used to keep track of the future engine invocations created using the {@link Dispatcher} instance. + * In case the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private record Invocation(long timestamp, DispatcherHandle handle) { + /** + * Cancel the engine invocation. + */ + void cancel() { + handle.cancel(); + } + } +} diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt deleted file mode 100644 index 864512d3..00000000 --- a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.common.util - -import kotlinx.coroutines.Delay -import kotlinx.coroutines.DisposableHandle -import kotlinx.coroutines.InternalCoroutinesApi -import java.time.InstantSource -import java.util.ArrayDeque -import java.util.PriorityQueue -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext - -/** - * A TimerScheduler facilitates scheduled execution of future tasks. - * - * @param context The [CoroutineContext] to run the tasks with. - * @param clock The clock to keep track of the time. - */ -public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: InstantSource) { - /** - * The [Delay] instance that provides scheduled execution of [Runnable]s. - */ - @OptIn(InternalCoroutinesApi::class) - private val delay = - requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } - - /** - * The stack of the invocations to occur in the future. - */ - private val invocations = ArrayDeque<Invocation>() - - /** - * A priority queue containing the tasks to be scheduled in the future. - */ - private val queue = PriorityQueue<Timer<T>>() - - /** - * A map that keeps track of the timers. - */ - private val timers = mutableMapOf<T, Timer<T>>() - - /** - * Start a timer that will invoke the specified [block] after [delay]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. - * - * @param key The key of the timer to start. - * @param delay The delay before invoking the block. - * @param block The block to invoke. - */ - public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { - startSingleTimerTo(key, clock.millis() + delay, block) - } - - /** - * Start a timer that will invoke the specified [block] at [timestamp]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. - * - * @param key The key of the timer to start. - * @param timestamp The timestamp at which to invoke the block. - * @param block The block to invoke. - */ - public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { - val now = clock.millis() - val queue = queue - val invocations = invocations - - require(timestamp >= now) { "Timestamp must be in the future" } - - timers.compute(key) { _, old -> - if (old != null && old.timestamp == timestamp) { - // Fast-path: timer for the same timestamp already exists - old.block = block - old - } else { - // Slow-path: cancel old timer and replace it with new timer - val timer = Timer(key, timestamp, block) - - old?.isCancelled = true - queue.add(timer) - trySchedule(now, invocations, timestamp) - - timer - } - } - } - - /** - * Check if a timer with a given key is active. - * - * @param key The key to check if active. - * @return `true` if the timer with the specified [key] is active, `false` otherwise. - */ - public fun isTimerActive(key: T): Boolean = key in timers - - /** - * Cancel a timer with a given key. - * - * If canceling a timer that was already canceled, or key never was used to start - * a timer this operation will do nothing. - * - * @param key The key of the timer to cancel. - */ - public fun cancel(key: T) { - val timer = timers.remove(key) - - // Mark the timer as cancelled - timer?.isCancelled = true - } - - /** - * Cancel all timers. - */ - public fun cancelAll() { - queue.clear() - timers.clear() - - // Cancel all pending invocations - for (invocation in invocations) { - invocation.cancel() - } - invocations.clear() - } - - /** - * Try to schedule an engine invocation at the specified [target]. - * - * @param now The current virtual timestamp. - * @param target The virtual timestamp at which the engine invocation should happen. - * @param scheduled The queue of scheduled invocations. - */ - private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { - val head = scheduled.peek() - - // Only schedule a new scheduler invocation in case the target is earlier than all other pending - // scheduler invocations - if (head == null || target < head.timestamp) { - @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context) - scheduled.addFirst(Invocation(target, handle)) - } - } - - /** - * This method is invoked when the earliest timer expires. - */ - private fun doRunTimers() { - val invocations = invocations - val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue - val now = invocation.timestamp - - while (queue.isNotEmpty()) { - val timer = queue.peek() - - val timestamp = timer.timestamp - val isCancelled = timer.isCancelled - - assert(timestamp >= now) { "Found task in the past" } - - if (timestamp > now && !isCancelled) { - // Schedule a task for the next event to occur. - trySchedule(now, invocations, timestamp) - break - } - - queue.poll() - - if (!isCancelled) { - timers.remove(timer.key) - timer() - } - } - } - - /** - * A task that is scheduled to run in the future. - */ - private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> { - /** - * A flag to indicate that the task has been cancelled. - */ - @JvmField - var isCancelled: Boolean = false - - /** - * Run the task. - */ - operator fun invoke(): Unit = block() - - override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp) - } - - /** - * A future engine invocation. - * - * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case - * the invocation is not needed anymore, it can be cancelled via [cancel]. - */ - private class Invocation( - @JvmField val timestamp: Long, - @JvmField val handle: DisposableHandle - ) { - /** - * Cancel the engine invocation. - */ - fun cancel() = handle.dispose() - } -} diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt index 51e36eea..3235b046 100644 --- a/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/PacerTest.kt @@ -28,26 +28,18 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows import org.opendc.simulator.kotlin.runSimulation -import java.time.InstantSource -import kotlin.coroutines.EmptyCoroutineContext /** * Test suite for the [Pacer] class. */ class PacerTest { @Test - fun testEmptyContext() { - assertThrows<IllegalArgumentException> { Pacer(EmptyCoroutineContext, InstantSource.system(), 100) {} } - } - - @Test fun testSingleEnqueue() { var count = 0 runSimulation { - val pacer = Pacer(coroutineContext, timeSource, quantum = 100) { + val pacer = Pacer(dispatcher, /*quantum*/ 100) { count++ } @@ -62,7 +54,7 @@ class PacerTest { var count = 0 runSimulation { - val pacer = Pacer(coroutineContext, timeSource, quantum = 100) { + val pacer = Pacer(dispatcher, /*quantum*/ 100) { count++ } @@ -80,7 +72,7 @@ class PacerTest { var count = 0 runSimulation { - val pacer = Pacer(coroutineContext, timeSource, quantum = 100) { + val pacer = Pacer(dispatcher, /*quantum*/ 100) { count++ } @@ -98,7 +90,7 @@ class PacerTest { var count = 0 runSimulation { - val pacer = Pacer(coroutineContext, timeSource, quantum = 100) { + val pacer = Pacer(dispatcher, /*quantum*/ 100) { count++ } @@ -116,7 +108,7 @@ class PacerTest { var count = 0 runSimulation { - val pacer = Pacer(coroutineContext, timeSource, quantum = 100) { + val pacer = Pacer(dispatcher, /*quantum*/ 100) { count++ } diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt index e8ec97a4..3947fa2e 100644 --- a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt @@ -29,22 +29,15 @@ import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.kotlin.runSimulation -import java.time.Clock -import kotlin.coroutines.EmptyCoroutineContext /** * A test suite for the [TimerScheduler] class. */ -internal class TimerSchedulerTest { - @Test - fun testEmptyContext() { - assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) } - } - +class TimerSchedulerTest { @Test fun testBasicTimer() { runSimulation { - val scheduler = TimerScheduler<Int>(coroutineContext, timeSource) + val scheduler = TimerScheduler<Int>(dispatcher) scheduler.startSingleTimer(0, 1000) { assertEquals(1000, timeSource.millis()) @@ -58,7 +51,7 @@ internal class TimerSchedulerTest { @Test fun testCancelNonExisting() { runSimulation { - val scheduler = TimerScheduler<Int>(coroutineContext, timeSource) + val scheduler = TimerScheduler<Int>(dispatcher) scheduler.cancel(1) } @@ -67,7 +60,7 @@ internal class TimerSchedulerTest { @Test fun testCancelExisting() { runSimulation { - val scheduler = TimerScheduler<Int>(coroutineContext, timeSource) + val scheduler = TimerScheduler<Int>(dispatcher) scheduler.startSingleTimer(0, 1000) { fail() @@ -84,7 +77,7 @@ internal class TimerSchedulerTest { @Test fun testCancelAll() { runSimulation { - val scheduler = TimerScheduler<Int>(coroutineContext, timeSource) + val scheduler = TimerScheduler<Int>(dispatcher) scheduler.startSingleTimer(0, 1000) { fail() } scheduler.startSingleTimer(1, 100) { fail() } @@ -95,7 +88,7 @@ internal class TimerSchedulerTest { @Test fun testOverride() { runSimulation { - val scheduler = TimerScheduler<Int>(coroutineContext, timeSource) + val scheduler = TimerScheduler<Int>(dispatcher) scheduler.startSingleTimer(0, 1000) { fail() } @@ -108,7 +101,7 @@ internal class TimerSchedulerTest { @Test fun testOverrideBlock() { runSimulation { - val scheduler = TimerScheduler<Int>(coroutineContext, timeSource) + val scheduler = TimerScheduler<Int>(dispatcher) scheduler.startSingleTimer(0, 1000) { fail() } @@ -121,7 +114,7 @@ internal class TimerSchedulerTest { @Test fun testNegativeDelay() { runSimulation { - val scheduler = TimerScheduler<Int>(coroutineContext, timeSource) + val scheduler = TimerScheduler<Int>(dispatcher) assertThrows<IllegalArgumentException> { scheduler.startSingleTimer(1, -1) { diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 4ced9569..9d7dcba6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,6 +22,7 @@ package org.opendc.compute.service +import org.opendc.common.Dispatcher import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host @@ -29,8 +30,6 @@ import org.opendc.compute.service.internal.ComputeServiceImpl import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Duration -import java.time.InstantSource -import kotlin.coroutines.CoroutineContext /** * The [ComputeService] hosts the API implementation of the OpenDC Compute service. @@ -80,18 +79,16 @@ public interface ComputeService : AutoCloseable { /** * Construct a new [ComputeService] implementation. * - * @param context The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. + * @param dispatcher The [Dispatcher] for scheduling future events. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( - context: CoroutineContext, - clock: InstantSource, + dispatcher: Dispatcher, scheduler: ComputeScheduler, schedulingQuantum: Duration = Duration.ofMinutes(5) ): ComputeService { - return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum) + return ComputeServiceImpl(dispatcher, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 2b755988..77932545 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.compute.service.internal import mu.KotlinLogging +import org.opendc.common.Dispatcher import org.opendc.common.util.Pacer import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Flavor @@ -37,25 +38,21 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.telemetry.SchedulerStats import java.time.Duration import java.time.Instant -import java.time.InstantSource import java.util.ArrayDeque import java.util.Deque import java.util.Random import java.util.UUID -import kotlin.coroutines.CoroutineContext import kotlin.math.max /** * Internal implementation of the OpenDC Compute service. * - * @param coroutineContext The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. + * @param dispatcher The [Dispatcher] for scheduling future events. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( - coroutineContext: CoroutineContext, - private val clock: InstantSource, + private val dispatcher: Dispatcher, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { @@ -108,6 +105,7 @@ internal class ComputeServiceImpl( override val hosts: Set<Host> get() = hostToView.keys + private val clock = dispatcher.timeSource private var maxCores = 0 private var maxMemory = 0L private var _attemptsSuccess = 0L @@ -120,7 +118,7 @@ internal class ComputeServiceImpl( /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } + private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() } override fun newClient(): ComputeClient { check(!isClosed) { "Service is already closed" } diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index b790d36f..b5685aba 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -62,12 +62,11 @@ internal class ComputeServiceTest { @BeforeEach fun setUp() { scope = SimulationCoroutineScope() - val clock = scope.timeSource val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - service = ComputeService(scope.coroutineContext, clock, computeScheduler) + service = ComputeService(scope.dispatcher, computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index ac97552f..a496cc99 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -75,7 +75,7 @@ internal class SimHostTest { fun testSingle() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -142,7 +142,7 @@ internal class SimHostTest { fun testOvercommitted() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -229,7 +229,7 @@ internal class SimHostTest { fun testFailure() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt index 66fcca22..eae5806e 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/Provisioner.kt @@ -22,13 +22,12 @@ package org.opendc.experiments.provisioner +import org.opendc.common.Dispatcher import org.opendc.experiments.MutableServiceRegistry import org.opendc.experiments.ServiceRegistry import org.opendc.experiments.internal.ServiceRegistryImpl -import java.time.InstantSource import java.util.ArrayDeque import java.util.SplittableRandom -import kotlin.coroutines.CoroutineContext /** * A helper class to set up the experimental environment in a reproducible manner. @@ -37,17 +36,15 @@ import kotlin.coroutines.CoroutineContext * [ProvisioningStep]s are executed sequentially and ensure that the necessary infrastructure is configured and teared * down after the simulation completes. * - * @param coroutineContext The [CoroutineContext] in which the environment is set up. - * @param clock The simulation clock represented as [InstantSource]. + * @param dispatcher The [Dispatcher] implementation for scheduling future tasks. * @param seed A seed for initializing the randomness of the environment. */ -public class Provisioner(coroutineContext: CoroutineContext, clock: InstantSource, seed: Long) : AutoCloseable { +public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable { /** * Implementation of [ProvisioningContext]. */ private val context = object : ProvisioningContext { - override val clock: InstantSource = clock - override val coroutineContext: CoroutineContext = coroutineContext + override val dispatcher: Dispatcher = dispatcher override val seeder: SplittableRandom = SplittableRandom(seed) override val registry: MutableServiceRegistry = ServiceRegistryImpl() diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt index 7eec6fa4..e53044ce 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/provisioner/ProvisioningContext.kt @@ -22,27 +22,21 @@ package org.opendc.experiments.provisioner +import org.opendc.common.Dispatcher import org.opendc.experiments.MutableServiceRegistry -import java.time.InstantSource import java.util.SplittableRandom import java.util.random.RandomGenerator -import kotlin.coroutines.CoroutineContext /** * The [ProvisioningContext] class provides access to shared state between subsequent [ProvisioningStep]s, as well as - * access to the simulation dispatcher (via [CoroutineContext]), the virtual clock, and a randomness seeder to allow + * access to the simulation dispatcher, the virtual clock, and a randomness seeder to allow * the provisioning steps to initialize the (simulated) resources. */ public interface ProvisioningContext { /** - * The [CoroutineContext] in which the provisioner runs. + * The [Dispatcher] provided by the provisioner to schedule future events during the simulation. */ - public val coroutineContext: CoroutineContext - - /** - * The [InstantSource] tracking the virtual simulation time. - */ - public val clock: InstantSource + public val dispatcher: Dispatcher /** * A [SplittableRandom] instance used to seed the provisioners. diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 08bb2c32..1221f084 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -75,7 +75,7 @@ class CapelinBenchmarks { fun benchmarkCapelin() = runSimulation { val serviceDomain = "compute.opendc.org" - Provisioner(coroutineContext, timeSource, seed = 0).use { provisioner -> + Provisioner(dispatcher, seed = 0).use { provisioner -> val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 1f9f3439..2567a4d5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -64,7 +64,7 @@ public class CapelinRunner( val serviceDomain = "compute.opendc.org" val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) - Provisioner(coroutineContext, timeSource, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }), setupHosts(serviceDomain, topology, optimize = true) diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index aa7d552e..7e01bb64 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -94,7 +94,7 @@ class CapelinIntegrationTest { val topology = createTopology() val monitor = monitor - Provisioner(coroutineContext, timeSource, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -138,7 +138,7 @@ class CapelinIntegrationTest { val topology = createTopology("single") val monitor = monitor - Provisioner(coroutineContext, timeSource, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -177,7 +177,7 @@ class CapelinIntegrationTest { val workload = createTestWorkload(1.0, seed) val topology = createTopology("single") - Provisioner(coroutineContext, timeSource, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), @@ -216,7 +216,7 @@ class CapelinIntegrationTest { val workload = createTestWorkload(0.25, seed) val monitor = monitor - Provisioner(coroutineContext, timeSource, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt index 38cbf2dc..d7347327 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/ComputeServiceProvisioningStep.kt @@ -41,7 +41,7 @@ public class ComputeServiceProvisioningStep internal constructor( private val schedulingQuantum: Duration ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val service = ComputeService(ctx.coroutineContext, ctx.clock, scheduler(ctx), schedulingQuantum) + val service = ComputeService(ctx.dispatcher, scheduler(ctx), schedulingQuantum) ctx.registry.register(serviceDomain, ComputeService::class.java, service) return AutoCloseable { service.close() } diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt index e224fb84..310aa54c 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/HostsProvisioningStep.kt @@ -46,7 +46,7 @@ public class HostsProvisioningStep internal constructor( ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val engine = FlowEngine.create(ctx.coroutineContext, ctx.clock) + val engine = FlowEngine.create(ctx.dispatcher) val graph = engine.newGraph() val hosts = mutableSetOf<SimHost>() @@ -58,7 +58,7 @@ public class HostsProvisioningStep internal constructor( spec.uid, spec.name, spec.meta, - ctx.clock, + ctx.dispatcher.timeSource, machine, hypervisor, optimize = optimize diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt index f39f74bc..efd38a3c 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMetricReader.kt @@ -27,6 +27,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import mu.KotlinLogging +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host @@ -37,26 +39,25 @@ import org.opendc.experiments.compute.telemetry.table.ServerTableReader import org.opendc.experiments.compute.telemetry.table.ServiceTableReader import java.time.Duration import java.time.Instant -import java.time.InstantSource /** * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every * export interval. * - * @param scope The [CoroutineScope] to run the reader in. - * @param clock The virtual clock. + * @param dispatcher A [Dispatcher] for scheduling the future events. * @param service The [ComputeService] to monitor. * @param monitor The monitor to export the metrics to. * @param exportInterval The export interval. */ public class ComputeMetricReader( - scope: CoroutineScope, - clock: InstantSource, + dispatcher: Dispatcher, private val service: ComputeService, private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) + private val clock = dispatcher.timeSource /** * Aggregator for service metrics. diff --git a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt index 68ca5ae8..665611dd 100644 --- a/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-compute/src/main/kotlin/org/opendc/experiments/compute/telemetry/ComputeMonitorProvisioningStep.kt @@ -22,9 +22,6 @@ package org.opendc.experiments.compute.telemetry -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import org.opendc.compute.service.ComputeService import org.opendc.experiments.provisioner.ProvisioningContext import org.opendc.experiments.provisioner.ProvisioningStep @@ -40,13 +37,8 @@ public class ComputeMonitorProvisioningStep internal constructor( private val exportInterval: Duration ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { - val scope = CoroutineScope(ctx.coroutineContext + Job()) val service = requireNotNull(ctx.registry.resolve(serviceDomain, ComputeService::class.java)) { "Compute service $serviceDomain does not exist" } - val metricReader = ComputeMetricReader(scope, ctx.clock, service, monitor, exportInterval) - - return AutoCloseable { - metricReader.close() - scope.cancel() - } + val metricReader = ComputeMetricReader(ctx.dispatcher, service, monitor, exportInterval) + return AutoCloseable { metricReader.close() } } } diff --git a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt index 3b4200c8..e5c2f86a 100644 --- a/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-faas/src/main/kotlin/org/opendc/experiments/faas/FaaSServiceProvisioningStep.kt @@ -56,10 +56,9 @@ public class FaaSServiceProvisioningStep internal constructor( } else { ZeroDelayInjector } - val deployer = SimFunctionDeployer(ctx.coroutineContext, ctx.clock, machineModel, delayInjector) + val deployer = SimFunctionDeployer(ctx.dispatcher, machineModel, delayInjector) val service = FaaSService( - ctx.coroutineContext, - ctx.clock, + ctx.dispatcher, deployer, routingPolicy(ctx), terminationPolicy(ctx) diff --git a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt index ff825260..4a4d9ae0 100644 --- a/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt +++ b/opendc-experiments/opendc-experiments-faas/src/test/kotlin/org/opendc/experiments/faas/FaaSExperiment.kt @@ -49,12 +49,12 @@ class FaaSExperiment { fun testSmoke() = runSimulation { val faasService = "faas.opendc.org" - Provisioner(coroutineContext, timeSource, seed = 0L).use { provisioner -> + Provisioner(dispatcher, seed = 0L).use { provisioner -> provisioner.runStep( setupFaaSService( faasService, { RandomRoutingPolicy() }, - { FunctionTerminationPolicyFixed(it.coroutineContext, it.clock, timeout = Duration.ofMinutes(10)) }, + { FunctionTerminationPolicyFixed(it.dispatcher, timeout = Duration.ofMinutes(10)) }, createMachineModel(), coldStartModel = ColdStartModel.GOOGLE ) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 51f6e763..53bf5aa6 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,12 +22,9 @@ package org.opendc.experiments.tf20.core -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.common.Dispatcher import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext @@ -36,17 +33,14 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.CpuPowerModel -import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.FlowStage import org.opendc.simulator.flow2.FlowStageLogic import org.opendc.simulator.flow2.OutPort -import java.time.InstantSource import java.util.ArrayDeque import java.util.UUID import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.math.ceil import kotlin.math.roundToLong @@ -57,22 +51,16 @@ import kotlin.math.roundToLong public class SimTFDevice( override val uid: UUID, override val isGpu: Boolean, - context: CoroutineContext, - clock: InstantSource, + dispatcher: Dispatcher, pu: ProcessingUnit, private val memory: MemoryUnit, powerModel: CpuPowerModel ) : TFDevice { /** - * The scope in which the device runs. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine.create( - FlowEngine.create(context, clock).newGraph(), + FlowEngine.create(dispatcher).newGraph(), MachineModel(listOf(pu), listOf(memory)), SimPsuFactories.simple(powerModel) ) @@ -162,9 +150,7 @@ public class SimTFDevice( } init { - scope.launch { - machine.runWorkload(workload) - } + machine.startWorkload(workload, emptyMap()) {} } override suspend fun load(dataSize: Long) { @@ -185,7 +171,6 @@ public class SimTFDevice( override fun close() { machine.cancel() - scope.cancel() } private data class Work(var flops: Double, val cont: Continuation<Unit>) { diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 6fcdf513..5b408fb3 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -23,19 +23,18 @@ package org.opendc.experiments.tf20.network import kotlinx.coroutines.channels.Channel +import org.opendc.common.Dispatcher import org.opendc.common.util.TimerScheduler -import java.time.InstantSource -import kotlin.coroutines.CoroutineContext /** * The network controller represents a simple network model between the worker and master nodes during * TensorFlow execution. */ -public class NetworkController(context: CoroutineContext, clock: InstantSource) : AutoCloseable { +public class NetworkController(dispatcher: Dispatcher) : AutoCloseable { /** * The scheduler for the message. */ - private val scheduler = TimerScheduler<Message>(context, clock) + private val scheduler = TimerScheduler<Message>(dispatcher) /** * The outbound communication channels. diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt index d01a4a3c..899aafc0 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt @@ -48,8 +48,7 @@ class TensorFlowTest { val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - timeSource, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -83,8 +82,7 @@ class TensorFlowTest { val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - timeSource, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -118,8 +116,7 @@ class TensorFlowTest { val deviceA = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, - coroutineContext, - timeSource, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) @@ -128,8 +125,7 @@ class TensorFlowTest { val deviceB = SimTFDevice( UUID.randomUUID(), def.meta["gpu"] as Boolean, - coroutineContext, - timeSource, + dispatcher, def.model.cpus[0], def.model.memory[0], CpuPowerModels.linear(250.0, 60.0) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 9f15eab6..549c6f3e 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -47,8 +47,7 @@ internal class SimTFDeviceTest { val device = SimTFDevice( UUID.randomUUID(), isGpu = true, - coroutineContext, - timeSource, + dispatcher, pu, memory, CpuPowerModels.linear(250.0, 100.0) diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt index 5cee9abf..fe4fde17 100644 --- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt @@ -47,8 +47,7 @@ public class WorkflowServiceProvisioningStep internal constructor( val client = computeService.newClient() val service = WorkflowService( - ctx.coroutineContext, - ctx.clock, + ctx.dispatcher, client, scheduler.schedulingQuantum, jobAdmissionPolicy = scheduler.jobAdmissionPolicy, diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 53706c57..96619cdb 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,6 +22,7 @@ package org.opendc.faas.service +import org.opendc.common.Dispatcher import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -31,8 +32,6 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Duration -import java.time.InstantSource -import kotlin.coroutines.CoroutineContext /** * The [FaaSService] hosts the service implementation of the OpenDC FaaS platform. @@ -62,22 +61,20 @@ public interface FaaSService : AutoCloseable { /** * Construct a new [FaaSService] implementation. * - * @param context The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. + * @param dispatcher The [Dispatcher] used for scheduling events. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( - context: CoroutineContext, - clock: InstantSource, + dispatcher: Dispatcher, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) + return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index f494adb1..a2c371e1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -22,12 +22,11 @@ package org.opendc.faas.service.autoscaler +import org.opendc.common.Dispatcher import org.opendc.common.util.TimerScheduler import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState import java.time.Duration -import java.time.InstantSource -import kotlin.coroutines.CoroutineContext /** * A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time. @@ -35,14 +34,13 @@ import kotlin.coroutines.CoroutineContext * @param timeout The idle timeout after which the function instance is terminated. */ public class FunctionTerminationPolicyFixed( - context: CoroutineContext, - clock: InstantSource, + dispatcher: Dispatcher, public val timeout: Duration ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. */ - private val scheduler = TimerScheduler<FunctionInstance>(context, clock) + private val scheduler = TimerScheduler<FunctionInstance>(dispatcher) override fun enqueue(instance: FunctionInstance) { // Cancel the existing timeout timer @@ -61,6 +59,6 @@ public class FunctionTerminationPolicyFixed( * Schedule termination for the specified [instance]. */ private fun schedule(instance: FunctionInstance) { - scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() } + scheduler.startSingleTimer(instance, timeout.toMillis()) { instance.close() } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 3235ff1a..b1e6b3f5 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,13 +22,11 @@ package org.opendc.faas.service.internal -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.suspendCancellableCoroutine import mu.KotlinLogging +import org.opendc.common.Dispatcher import org.opendc.common.util.Pacer import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction @@ -49,7 +47,6 @@ import java.util.ArrayDeque import java.util.Random import java.util.UUID import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resumeWithException /** @@ -60,19 +57,13 @@ import kotlin.coroutines.resumeWithException * this component queues the events to await the deployment of new instances. */ internal class FaaSServiceImpl( - context: CoroutineContext, - private val clock: InstantSource, + dispatcher: Dispatcher, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy, quantum: Duration ) : FaaSService, FunctionInstanceListener { /** - * The [CoroutineScope] of the service bounded by the lifecycle of the service. - */ - private val scope = CoroutineScope(context + Job()) - - /** * The logger instance of this server. */ private val logger = KotlinLogging.logger {} @@ -80,7 +71,12 @@ internal class FaaSServiceImpl( /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } + private val pacer = Pacer(dispatcher, quantum.toMillis()) { doSchedule() } + + /** + * The [InstantSource] instance representing the clock. + */ + private val clock = dispatcher.timeSource /** * The [Random] instance used to generate unique identifiers for the objects. @@ -266,8 +262,6 @@ internal class FaaSServiceImpl( } override fun close() { - scope.cancel() - // Stop all function instances for ((_, function) in functions) { function.close() diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index e29864da..9676744b 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -44,7 +44,7 @@ internal class FaaSServiceTest { @Test fun testClientState() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +58,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +67,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +78,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +91,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +104,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +117,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +128,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +142,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runSimulation { - val service = FaaSService(coroutineContext, timeSource, mockk(), mockk(), mockk()) + val service = FaaSService(dispatcher, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +155,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runSimulation { val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, timeSource, deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(dispatcher, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 9ec26d5d..47b4d4fa 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -31,6 +31,8 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher import org.opendc.faas.service.FunctionObject import org.opendc.faas.service.deployer.FunctionDeployer import org.opendc.faas.service.deployer.FunctionInstance @@ -44,10 +46,8 @@ import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.flow2.FlowEngine -import java.time.InstantSource import java.util.ArrayDeque import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -55,8 +55,7 @@ import kotlin.coroutines.resumeWithException * A [FunctionDeployer] that uses that simulates the [FunctionInstance]s. */ public class SimFunctionDeployer( - context: CoroutineContext, - private val clock: InstantSource, + private val dispatcher: Dispatcher, private val model: MachineModel, private val delayInjector: DelayInjector, private val mapper: SimFaaSWorkloadMapper = SimMetaFaaSWorkloadMapper() @@ -64,7 +63,7 @@ public class SimFunctionDeployer( /** * The [CoroutineScope] of this deployer. */ - private val scope = CoroutineScope(context + Job()) + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + Job()) override fun deploy(function: FunctionObject, listener: FunctionInstanceListener): Instance { val instance = Instance(function, listener) @@ -86,7 +85,7 @@ public class SimFunctionDeployer( * The machine that will execute the workloads. */ public val machine: SimMachine = SimBareMetalMachine.create( - FlowEngine.create(scope.coroutineContext, clock).newGraph(), + FlowEngine.create(dispatcher).newGraph(), model ) diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index e51c3019..be133ded 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -73,13 +73,12 @@ internal class SimFaaSServiceTest { }) val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) - val deployer = SimFunctionDeployer(coroutineContext, timeSource, machineModel, delayInjector) { workload } + val deployer = SimFunctionDeployer(dispatcher, machineModel, delayInjector) { workload } val service = FaaSService( - coroutineContext, - timeSource, + dispatcher, deployer, RandomRoutingPolicy(), - FunctionTerminationPolicyFixed(coroutineContext, timeSource, timeout = Duration.ofMillis(10000)) + FunctionTerminationPolicyFixed(dispatcher, timeout = Duration.ofMillis(10000)) ) val client = service.newClient() diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index b761598b..eea46b95 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -74,7 +74,7 @@ class SimMachineBenchmarks { @Benchmark fun benchmarkBareMetal() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) return@runSimulation machine.runWorkload(trace.createWorkload(0)) @@ -84,7 +84,7 @@ class SimMachineBenchmarks { @Benchmark fun benchmarkSpaceSharedHypervisor() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1)) @@ -105,7 +105,7 @@ class SimMachineBenchmarks { @Benchmark fun benchmarkFairShareHypervisorSingle() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) @@ -126,7 +126,7 @@ class SimMachineBenchmarks { @Benchmark fun benchmarkFairShareHypervisorDouble() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 646d687d..58b01e06 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -72,7 +72,7 @@ class SimMachineTest { @Test fun testFlopsWorkload() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -97,7 +97,7 @@ class SimMachineTest { } val trace = builder.build() - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( graph, @@ -112,7 +112,7 @@ class SimMachineTest { @Test fun testDualSocketMachine() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val cpuNode = machineModel.cpus[0].node @@ -133,7 +133,7 @@ class SimMachineTest { @Test fun testPower() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( graph, @@ -156,7 +156,7 @@ class SimMachineTest { @Test fun testCapacityClamp() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -184,7 +184,7 @@ class SimMachineTest { @Test fun testMemory() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -206,7 +206,7 @@ class SimMachineTest { @Test fun testMemoryUsage() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -230,7 +230,7 @@ class SimMachineTest { @Test fun testNetUsage() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -258,7 +258,7 @@ class SimMachineTest { @Test fun testDiskReadUsage() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -283,7 +283,7 @@ class SimMachineTest { @Test fun testDiskWriteUsage() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -308,7 +308,7 @@ class SimMachineTest { @Test fun testCancellation() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -330,7 +330,7 @@ class SimMachineTest { @Test fun testConcurrentRuns() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -351,7 +351,7 @@ class SimMachineTest { @Test fun testCatchStartFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -367,7 +367,7 @@ class SimMachineTest { @Test fun testCatchStopFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -384,7 +384,7 @@ class SimMachineTest { @Test fun testCatchShutdownFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -400,7 +400,7 @@ class SimMachineTest { @Test fun testCatchNestedFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index f60ff67c..99f47b2f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -74,7 +74,7 @@ internal class SimFairShareHypervisorTest { SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ).createWorkload(0) - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, model) @@ -118,7 +118,7 @@ internal class SimFairShareHypervisorTest { SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1) ).createWorkload(0) - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, model) @@ -157,7 +157,7 @@ internal class SimFairShareHypervisorTest { /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, model) @@ -184,7 +184,7 @@ internal class SimFairShareHypervisorTest { .addGroup(setOf("a", "n"), 0.1, 0.8) .build() - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, model) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 31718794..93b67aa3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -75,7 +75,7 @@ internal class SimSpaceSharedHypervisorTest { SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ).createWorkload(0) - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -99,7 +99,7 @@ internal class SimSpaceSharedHypervisorTest { fun testRuntimeWorkload() = runSimulation { val duration = 5 * 60L * 1000 val workload = SimWorkloads.runtime(duration, 1.0) - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -123,7 +123,7 @@ internal class SimSpaceSharedHypervisorTest { fun testFlopsWorkload() = runSimulation { val duration = 5 * 60L * 1000 val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0) - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -144,7 +144,7 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testTwoWorkloads() = runSimulation { val duration = 5 * 60L * 1000 - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -173,7 +173,7 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadFails() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) @@ -200,7 +200,7 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt index c208a2af..08bb6509 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt @@ -59,7 +59,7 @@ class SimChainWorkloadTest { @Test fun testMultipleWorkloads() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -80,7 +80,7 @@ class SimChainWorkloadTest { @Test fun testStartFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -105,7 +105,7 @@ class SimChainWorkloadTest { @Test fun testStartFailureSecond() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -131,7 +131,7 @@ class SimChainWorkloadTest { @Test fun testStopFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -155,7 +155,7 @@ class SimChainWorkloadTest { @Test fun testStopFailureSecond() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -180,7 +180,7 @@ class SimChainWorkloadTest { @Test fun testStartAndStopFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -206,7 +206,7 @@ class SimChainWorkloadTest { @Test fun testShutdownAndStopFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -232,7 +232,7 @@ class SimChainWorkloadTest { @Test fun testShutdownAndStartFailure() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -260,7 +260,7 @@ class SimChainWorkloadTest { @Test fun testSnapshot() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create(graph, machineModel) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index c0bdfd25..5c888fbc 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -53,7 +53,7 @@ class SimTraceWorkloadTest { @Test fun testSmoke() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -76,7 +76,7 @@ class SimTraceWorkloadTest { @Test fun testOffset() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -99,7 +99,7 @@ class SimTraceWorkloadTest { @Test fun testSkipFragment() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( @@ -123,7 +123,7 @@ class SimTraceWorkloadTest { @Test fun testZeroCores() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val machine = SimBareMetalMachine.create( diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts index 04d46607..4f04bdc1 100644 --- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts @@ -28,8 +28,8 @@ plugins { } dependencies { - api(libs.kotlinx.coroutines) - implementation(libs.kotlin.logging) + api(projects.opendc.opendcCommon) + implementation(libs.slf4j.api) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(libs.slf4j.simple) diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt index 5a67c7d2..59dd3bad 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt @@ -60,7 +60,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkSink() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 4200.0f) val source = TraceFlowSource(graph, trace) @@ -71,7 +71,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkForward() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 4200.0f) val source = TraceFlowSource(graph, trace) @@ -85,7 +85,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkMuxMaxMinSingleSource() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = MaxMinFlowMultiplexer(graph) @@ -103,7 +103,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkMuxMaxMinTripleSource() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = MaxMinFlowMultiplexer(graph) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java index cfa5a48f..c0f52505 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java @@ -26,9 +26,8 @@ import java.time.Clock; import java.time.InstantSource; import java.util.ArrayList; import java.util.List; -import kotlin.coroutines.ContinuationInterceptor; import kotlin.coroutines.CoroutineContext; -import kotlinx.coroutines.Delay; +import org.opendc.common.Dispatcher; /** * A {@link FlowEngine} simulates a generic flow network. @@ -57,23 +56,19 @@ public final class FlowEngine implements Runnable { */ private boolean active; - private final CoroutineContext coroutineContext; + private final Dispatcher dispatcher; private final InstantSource clock; - private final Delay delay; /** * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}. */ - public static FlowEngine create(CoroutineContext coroutineContext, InstantSource clock) { - return new FlowEngine(coroutineContext, clock); + public static FlowEngine create(Dispatcher dispatcher) { + return new FlowEngine(dispatcher); } - FlowEngine(CoroutineContext coroutineContext, InstantSource clock) { - this.coroutineContext = coroutineContext; - this.clock = clock; - - CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key; - this.delay = (Delay) coroutineContext.get(key); + FlowEngine(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + this.clock = dispatcher.getTimeSource(); } /** @@ -205,7 +200,7 @@ public final class FlowEngine implements Runnable { // Only schedule a new scheduler invocation in case the target is earlier than all other pending // scheduler invocations if (scheduled.tryAdd(target)) { - delay.invokeOnTimeout(target - now, this, coroutineContext); + dispatcher.schedule(target - now, this); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt index b5054375..467bf334 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt @@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation class FlowEngineTest { @Test fun testSmoke() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val multiplexer = MaxMinFlowMultiplexer(graph) @@ -55,7 +55,7 @@ class FlowEngineTest { @Test fun testConnectInvalidInlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val inlet = mockk<Inlet>() @@ -65,7 +65,7 @@ class FlowEngineTest { @Test fun testConnectInvalidOutlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val outlet = mockk<Outlet>() @@ -75,7 +75,7 @@ class FlowEngineTest { @Test fun testConnectInletBelongsToDifferentGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -87,7 +87,7 @@ class FlowEngineTest { @Test fun testConnectOutletBelongsToDifferentGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -99,7 +99,7 @@ class FlowEngineTest { @Test fun testConnectInletAlreadyConnected() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 2.0f) @@ -112,7 +112,7 @@ class FlowEngineTest { @Test fun testConnectOutletAlreadyConnected() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sinkA = SimpleFlowSink(graph, 2.0f) @@ -125,7 +125,7 @@ class FlowEngineTest { @Test fun testDisconnectInletInvalid() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val inlet = mockk<Inlet>() @@ -134,7 +134,7 @@ class FlowEngineTest { @Test fun testDisconnectOutletInvalid() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val outlet = mockk<Outlet>() @@ -143,7 +143,7 @@ class FlowEngineTest { @Test fun testDisconnectInletInvalidGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -154,7 +154,7 @@ class FlowEngineTest { @Test fun testDisconnectOutletInvalidGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -165,7 +165,7 @@ class FlowEngineTest { @Test fun testInletEquality() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sinkA = SimpleFlowSink(graph, 2.0f) @@ -181,7 +181,7 @@ class FlowEngineTest { @Test fun testOutletEquality() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt index d7a2190f..fef49786 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt @@ -39,7 +39,7 @@ class ForwardingFlowMultiplexerTest { */ @Test fun testTrace() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = ForwardingFlowMultiplexer(graph) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt index 635b1d98..ebae2d4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt @@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation class MaxMinFlowMultiplexerTest { @Test fun testSmoke() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = MaxMinFlowMultiplexer(graph) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt index d50a40b0..ea516c63 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt @@ -37,7 +37,7 @@ import java.util.concurrent.ThreadLocalRandom class FlowSinkTest { @Test fun testSmoke() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -51,7 +51,7 @@ class FlowSinkTest { @Test fun testAdjustCapacity() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -69,7 +69,7 @@ class FlowSinkTest { @Test fun testUtilization() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -83,7 +83,7 @@ class FlowSinkTest { @Test fun testFragments() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -114,7 +114,7 @@ class FlowSinkTest { ) return runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 4200.0f) val source = TraceFlowSource(graph, trace) diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index c1a558b8..181d9a20 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -43,7 +43,7 @@ import org.opendc.simulator.kotlin.runSimulation class SimNetworkSinkTest { @Test fun testInitialState() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) @@ -56,7 +56,7 @@ class SimNetworkSinkTest { @Test fun testDisconnectIdempotent() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) @@ -66,7 +66,7 @@ class SimNetworkSinkTest { @Test fun testConnectCircular() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) @@ -77,7 +77,7 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnectedTarget() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) val source = mockk<SimNetworkPort>(relaxUnitFun = true) @@ -90,7 +90,7 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnected() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) val source1 = TestSource(graph) @@ -107,7 +107,7 @@ class SimNetworkSinkTest { @Test fun testConnect() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) val source = TestSource(graph) @@ -127,7 +127,7 @@ class SimNetworkSinkTest { @Test fun testDisconnect() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) val source = TestSource(graph) diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt index e45b1bd7..4a489478 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt @@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation class SimNetworkSwitchVirtualTest { @Test fun testConnect() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) val source = TestSource(graph) @@ -60,7 +60,7 @@ class SimNetworkSwitchVirtualTest { @Test fun testConnectClosedPort() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimNetworkSink(graph, /*capacity*/ 100.0f) val switch = SimNetworkSwitchVirtual(graph) diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt index 2e0dc5c4..f596ca4e 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt @@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation internal class SimPduTest { @Test fun testZeroOutlets() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) val pdu = SimPdu(graph) @@ -48,7 +48,7 @@ internal class SimPduTest { @Test fun testSingleOutlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) val pdu = SimPdu(graph) @@ -62,7 +62,7 @@ internal class SimPduTest { @Test fun testDoubleOutlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 200.0f) val pdu = SimPdu(graph) @@ -78,7 +78,7 @@ internal class SimPduTest { @Test fun testDisconnect() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 300.0f) val pdu = SimPdu(graph) @@ -95,7 +95,7 @@ internal class SimPduTest { @Test fun testLoss() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 500.0f) // https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN @@ -110,7 +110,7 @@ internal class SimPduTest { @Test fun testOutletClose() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) val pdu = SimPdu(graph) diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt index 0f145592..03c942b4 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt @@ -42,7 +42,7 @@ import org.opendc.simulator.kotlin.runSimulation internal class SimPowerSourceTest { @Test fun testInitialState() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) @@ -57,7 +57,7 @@ internal class SimPowerSourceTest { @Test fun testDisconnectIdempotent() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) @@ -67,7 +67,7 @@ internal class SimPowerSourceTest { @Test fun testConnect() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) val inlet = TestInlet(graph) @@ -87,7 +87,7 @@ internal class SimPowerSourceTest { @Test fun testDisconnect() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) val inlet = TestInlet(graph) @@ -102,7 +102,7 @@ internal class SimPowerSourceTest { @Test fun testDisconnectAssertion() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) @@ -120,7 +120,7 @@ internal class SimPowerSourceTest { @Test fun testOutletAlreadyConnected() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) val inlet = TestInlet(graph) @@ -135,7 +135,7 @@ internal class SimPowerSourceTest { @Test fun testInletAlreadyConnected() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 100.0f) val inlet = mockk<SimPowerInlet>(relaxUnitFun = true) diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt index 4ce83fe9..89fede63 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt @@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation internal class SimUpsTest { @Test fun testSingleInlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 200.0f) val ups = SimUps(graph) @@ -49,7 +49,7 @@ internal class SimUpsTest { @Test fun testDoubleInlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source1 = SimPowerSource(graph, /*capacity*/ 200.0f) val source2 = SimPowerSource(graph, /*capacity*/ 200.0f) @@ -69,7 +69,7 @@ internal class SimUpsTest { @Test fun testLoss() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source = SimPowerSource(graph, /*capacity*/ 500.0f) // https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN @@ -84,7 +84,7 @@ internal class SimUpsTest { @Test fun testDisconnect() = runSimulation { - val engine = FlowEngine.create(coroutineContext, timeSource) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val source1 = SimPowerSource(graph, /*capacity*/ 200.0f) val source2 = SimPowerSource(graph, /*capacity*/ 200.0f) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 4c6fe755..86c1c521 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -260,7 +260,7 @@ public class OpenDCRunner( val scenario = scenario - Provisioner(coroutineContext, timeSource, seed).use { provisioner -> + Provisioner(dispatcher, seed).use { provisioner -> provisioner.runSteps( setupComputeService( serviceDomain, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index f0e86449..07b43b6d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,6 +22,7 @@ package org.opendc.workflow.service +import org.opendc.common.Dispatcher import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -31,8 +32,6 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Duration -import java.time.InstantSource -import kotlin.coroutines.CoroutineContext /** * A service for cloud workflow execution. @@ -59,9 +58,7 @@ public interface WorkflowService : AutoCloseable { /** * Construct a new [WorkflowService] implementation. * - * @param context The [CoroutineContext] to use in the service. - * @param clock The clock instance to use. - * @param meterProvider The meter provider to use. + * @param dispatcher A [Dispatcher] to schedule future events. * @param compute The "Compute" client to use. * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles). * @param jobAdmissionPolicy The job admission policy to use. @@ -70,8 +67,7 @@ public interface WorkflowService : AutoCloseable { * @param taskOrderPolicy The task order policy to use. */ public operator fun invoke( - context: CoroutineContext, - clock: InstantSource, + dispatcher: Dispatcher, compute: ComputeClient, schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, @@ -80,8 +76,7 @@ public interface WorkflowService : AutoCloseable { taskOrderPolicy: TaskOrderPolicy ): WorkflowService { return WorkflowServiceImpl( - context, - clock, + dispatcher, compute, schedulingQuantum, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 20e30fd4..01c1f565 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -26,6 +26,8 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.common.Dispatcher +import org.opendc.common.asCoroutineDispatcher import org.opendc.common.util.Pacer import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Image @@ -44,7 +46,6 @@ import java.time.Duration import java.time.InstantSource import java.util.PriorityQueue import java.util.Queue -import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume /** @@ -52,8 +53,7 @@ import kotlin.coroutines.resume * Datacenter Scheduling. */ public class WorkflowServiceImpl( - context: CoroutineContext, - private val clock: InstantSource, + dispatcher: Dispatcher, private val computeClient: ComputeClient, schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, @@ -64,7 +64,12 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - private val scope = CoroutineScope(context + kotlinx.coroutines.Job()) + private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher() + kotlinx.coroutines.Job()) + + /** + * The [InstantSource] representing the clock of this service. + */ + private val clock = dispatcher.timeSource /** * The incoming jobs ready to be processed by the scheduler. @@ -149,7 +154,7 @@ public class WorkflowServiceImpl( /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } + private val pacer = Pacer(dispatcher, schedulingQuantum.toMillis()) { doSchedule() } private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index f5edbb2f..e5e05a92 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -70,7 +70,7 @@ internal class WorkflowServiceTest { val computeService = "compute.opendc.org" val workflowService = "workflow.opendc.org" - Provisioner(coroutineContext, timeSource, seed = 0L).use { provisioner -> + Provisioner(dispatcher, seed = 0L).use { provisioner -> val scheduler: (ProvisioningContext) -> ComputeScheduler = { FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), |
