diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-21 17:00:09 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-04-21 17:00:09 +0200 |
| commit | 10dfc257de65cbbd1e25d1d7f5833bfb687d85ed (patch) | |
| tree | 48b102c4d45b366abd8d0d368c31d6c0596ac30f /simulator/opendc-simulator | |
| parent | b4d1289bbd9539c041e8aeb39bb8962628399809 (diff) | |
| parent | 62678b2890a7f3640836b99ca2fec9efd7485929 (diff) | |
simulator: Introduce SimulationCoroutineDispatcher (#120)
This change introduces the SimulationCoroutineDispatcher implementation which replaces the TestCoroutineDispatcher for running single-threaded simulations.
Diffstat (limited to 'simulator/opendc-simulator')
17 files changed, 428 insertions, 206 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index c2a29f5b..7b97a665 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -25,8 +25,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.test.runBlockingTest import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit @@ -34,7 +32,8 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.simulator.core.SimulationCoroutineScope +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.utils.TimerScheduler import org.openjdk.jmh.annotations.* import java.time.Clock @@ -46,15 +45,14 @@ import java.util.concurrent.TimeUnit @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { - private lateinit var scope: TestCoroutineScope + private lateinit var scope: SimulationCoroutineScope private lateinit var clock: Clock private lateinit var scheduler: TimerScheduler<Any> private lateinit var machineModel: SimMachineModel @Setup fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) + scope = SimulationCoroutineScope() scheduler = TimerScheduler(scope.coroutineContext, clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -77,18 +75,18 @@ class SimMachineBenchmarks { @Benchmark fun benchmarkBareMetal(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val machine = SimBareMetalMachine( coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - return@runBlockingTest machine.run(state.workloads[0]) + return@runBlockingSimulation machine.run(state.workloads[0]) } } @Benchmark fun benchmarkSpaceSharedHypervisor(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val machine = SimBareMetalMachine( coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) @@ -100,7 +98,7 @@ class SimMachineBenchmarks { val vm = hypervisor.createMachine(machineModel) try { - return@runBlockingTest vm.run(state.workloads[0]) + return@runBlockingSimulation vm.run(state.workloads[0]) } finally { vm.close() machine.close() @@ -110,7 +108,7 @@ class SimMachineBenchmarks { @Benchmark fun benchmarkFairShareHypervisorSingle(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val machine = SimBareMetalMachine( coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) @@ -122,7 +120,7 @@ class SimMachineBenchmarks { val vm = hypervisor.createMachine(machineModel) try { - return@runBlockingTest vm.run(state.workloads[0]) + return@runBlockingSimulation vm.run(state.workloads[0]) } finally { vm.close() machine.close() @@ -132,7 +130,7 @@ class SimMachineBenchmarks { @Benchmark fun benchmarkFairShareHypervisorDouble(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val machine = SimBareMetalMachine( coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 67295dfd..a067dd2e 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -26,7 +26,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -39,7 +38,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.simulator.core.runBlockingSimulation /** * Test suite for the [SimHypervisor] class. @@ -61,8 +60,7 @@ internal class SimHypervisorTest { * Test overcommitting of resources via the hypervisor with a single VM. */ @Test - fun testOvercommittedSingle() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testOvercommittedSingle() = runBlockingSimulation { val listener = object : SimHypervisor.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L @@ -116,7 +114,7 @@ internal class SimHypervisorTest { { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), res) { "VM usage is correct" } }, - { assertEquals(1200000, currentTime) { "Current time is correct" } } + { assertEquals(1200000, clock.millis()) { "Current time is correct" } } ) } @@ -124,8 +122,7 @@ internal class SimHypervisorTest { * Test overcommitting of resources via the hypervisor with two VMs. */ @Test - fun testOvercommittedDual() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testOvercommittedDual() = runBlockingSimulation { val listener = object : SimHypervisor.Listener { var totalRequestedWork = 0L var totalGrantedWork = 0L @@ -195,7 +192,7 @@ internal class SimHypervisorTest { { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, currentTime) } + { assertEquals(1200000, clock.millis()) } ) } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index d88dec52..205f2eca 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -37,7 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.simulator.core.runBlockingSimulation /** * Test suite for the [SimBareMetalMachine] class. @@ -57,23 +56,21 @@ class SimMachineTest { } @Test - fun testFlopsWorkload() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testFlopsWorkload() = runBlockingSimulation { val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) // Two cores execute 1000 MFlOps per second (1000 ms) - assertEquals(1000, currentTime) + assertEquals(1000, clock.millis()) } finally { machine.close() } } @Test - fun testDualSocketMachine() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testDualSocketMachine() = runBlockingSimulation { val cpuNode = machineModel.cpus[0].node val machineModel = SimMachineModel( cpus = List(cpuNode.coreCount * 2) { ProcessingUnit(cpuNode, it % 2, 1000.0) }, @@ -85,15 +82,14 @@ class SimMachineTest { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) // Two sockets with two cores execute 2000 MFlOps per second (500 ms) - assertEquals(500, currentTime) + assertEquals(500, clock.millis()) } finally { machine.close() } } @Test - fun testUsage() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testUsage() = runBlockingSimulation { val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) val res = mutableListOf<Double>() @@ -110,8 +106,7 @@ class SimMachineTest { } @Test - fun testClose() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testClose() = runBlockingSimulation { val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) machine.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index 51e4305a..ef6f536d 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach @@ -40,7 +39,7 @@ import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.simulator.core.runBlockingSimulation /** * A test suite for the [SimSpaceSharedHypervisor]. @@ -62,8 +61,7 @@ internal class SimSpaceSharedHypervisorTest { * Test a trace workload. */ @Test - fun testTrace() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testTrace() = runBlockingSimulation { val usagePm = mutableListOf<Double>() val usageVm = mutableListOf<Double>() @@ -103,7 +101,7 @@ internal class SimSpaceSharedHypervisorTest { { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usagePm) { "Correct PM usage" } }, // Temporary limitation is that VMs do not emit usage information // { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } }, - { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } } + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } ) } @@ -111,8 +109,7 @@ internal class SimSpaceSharedHypervisorTest { * Test runtime workload on hypervisor. */ @Test - fun testRuntimeWorkload() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testRuntimeWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) val machine = SimBareMetalMachine( @@ -128,16 +125,14 @@ internal class SimSpaceSharedHypervisorTest { vm.close() machine.close() - assertEquals(duration, currentTime) { "Took enough time" } + assertEquals(duration, clock.millis()) { "Took enough time" } } /** * Test FLOPs workload on hypervisor. */ @Test - fun testFlopsWorkload() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) - + fun testFlopsWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) val machine = SimBareMetalMachine( @@ -152,15 +147,14 @@ internal class SimSpaceSharedHypervisorTest { vm.run(workload) machine.close() - assertEquals(duration, currentTime) { "Took enough time" } + assertEquals(duration, clock.millis()) { "Took enough time" } } /** * Test two workloads running sequentially. */ @Test - fun testTwoWorkloads() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 val machine = SimBareMetalMachine( coroutineContext, clock, machineModel, PerformanceScalingGovernor(), @@ -180,15 +174,14 @@ internal class SimSpaceSharedHypervisorTest { vm2.close() machine.close() - assertEquals(duration * 2, currentTime) { "Took enough time" } + assertEquals(duration * 2, clock.millis()) { "Took enough time" } } /** * Test concurrent workloads on the machine. */ @Test - fun testConcurrentWorkloadFails() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testConcurrentWorkloadFails() = runBlockingSimulation { val machine = SimBareMetalMachine( coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) @@ -212,8 +205,7 @@ internal class SimSpaceSharedHypervisorTest { * Test concurrent workloads on the machine. */ @Test - fun testConcurrentWorkloadSucceeds() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { val machine = SimBareMetalMachine( coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) diff --git a/simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts b/simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts index 309afb19..3ba0d8c3 100644 --- a/simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts +++ b/simulator/opendc-simulator/opendc-simulator-core/build.gradle.kts @@ -29,5 +29,5 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) - api("org.jetbrains.kotlinx:kotlinx-coroutines-test") + api("org.jetbrains.kotlinx:kotlinx-coroutines-core") } diff --git a/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt new file mode 100644 index 00000000..9b284c11 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.core + +import kotlinx.coroutines.* +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Executes a [body] inside an immediate execution dispatcher. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public fun runBlockingSimulation(context: CoroutineContext = EmptyCoroutineContext, body: suspend SimulationCoroutineScope.() -> Unit) { + val (safeContext, dispatcher) = context.checkArguments() + val startingJobs = safeContext.activeJobs() + val scope = SimulationCoroutineScope(safeContext) + val deferred = scope.async { + body(scope) + } + dispatcher.advanceUntilIdle() + deferred.getCompletionExceptionOrNull()?.let { + throw it + } + val endingJobs = safeContext.activeJobs() + if ((endingJobs - startingJobs).isNotEmpty()) { + throw IllegalStateException("Test finished with active jobs: $endingJobs") + } +} + +/** + * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineScope]. + */ +public fun SimulationCoroutineScope.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runBlockingSimulation(coroutineContext, block) + +/** + * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineDispatcher]. + */ +public fun SimulationCoroutineDispatcher.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runBlockingSimulation(this, block) + +private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, SimulationController> { + val dispatcher = get(ContinuationInterceptor).run { + this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } } + this ?: SimulationCoroutineDispatcher() + } + + val job = get(Job) ?: SupervisorJob() + return Pair(this + dispatcher + job, dispatcher as SimulationController) +} + +private fun CoroutineContext.activeJobs(): Set<Job> { + return checkNotNull(this[Job]).children.filter { it.isActive }.toSet() +} diff --git a/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt index 84c18e87..2b670b91 100644 --- a/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/utils/DelayControllerClockAdapter.kt +++ b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,27 +20,27 @@ * SOFTWARE. */ -package org.opendc.simulator.utils +package org.opendc.simulator.core -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.DelayController +import kotlinx.coroutines.CoroutineDispatcher import java.time.Clock -import java.time.Instant -import java.time.ZoneId /** - * A virtual [Clock] that abstracts accesses to [DelayController]'s virtual clock. + * Control the virtual clock of a [CoroutineDispatcher]. */ -@OptIn(ExperimentalCoroutinesApi::class) -public class DelayControllerClockAdapter( - private val delayController: DelayController, - private val zone: ZoneId = ZoneId.systemDefault() -) : Clock() { - override fun getZone(): ZoneId = zone +public interface SimulationController { + /** + * The current virtual clock as it is known to this Dispatcher. + */ + public val clock: Clock - override fun withZone(zone: ZoneId): Clock = DelayControllerClockAdapter(delayController, zone) - - override fun instant(): Instant = Instant.ofEpochMilli(millis()) - - override fun millis(): Long = delayController.currentTime + /** + * Immediately execute all pending tasks and advance the virtual clock-time to the last delay. + * + * If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle` + * returns. + * + * @return the amount of delay-time that this Dispatcher's clock has been forwarded in milliseconds. + */ + public fun advanceUntilIdle(): Long } diff --git a/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt new file mode 100644 index 00000000..e2f7874c --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.core + +import kotlinx.coroutines.* +import java.lang.Runnable +import java.time.Clock +import java.time.Instant +import java.time.ZoneId +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual + * clock for time management. + */ +@OptIn(InternalCoroutinesApi::class) +public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationController, Delay { + /** + * The virtual clock of this dispatcher. + */ + override val clock: Clock = VirtualClock() + + /** + * Queue of ordered tasks to run. + */ + private val queue = PriorityQueue<TimedRunnable>() + + /** + * Global order counter. + */ + private var _counter = 0L + + /** + * The current virtual time of simulation + */ + private var _time = 0L + + override fun dispatch(context: CoroutineContext, block: Runnable) { + block.run() + } + + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + post(block) + } + + @OptIn(ExperimentalCoroutinesApi::class) + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { + postDelayed(CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) }, timeMillis) + } + + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + val node = postDelayed(block, timeMillis) + return object : DisposableHandle { + override fun dispose() { + queue.remove(node) + } + } + } + + override fun toString(): String { + return "SimulationCoroutineDispatcher[time=${_time}ms, queued=${queue.size}]" + } + + private fun post(block: Runnable) = + queue.add(TimedRunnable(block, _counter++)) + + private fun postDelayed(block: Runnable, delayTime: Long) = + TimedRunnable(block, _counter++, safePlus(_time, delayTime)) + .also { + queue.add(it) + } + + private fun safePlus(currentTime: Long, delayTime: Long): Long { + check(delayTime >= 0) + val result = currentTime + delayTime + if (result < currentTime) return Long.MAX_VALUE // clamp on overflow + return result + } + + override fun advanceUntilIdle(): Long { + val queue = queue + val oldTime = _time + while (queue.isNotEmpty()) { + val current = queue.poll() + + // If the scheduled time is 0 (immediate) use current virtual time + if (current.time != 0L) { + _time = current.time + } + + current.run() + } + + return _time - oldTime + } + + private inner class VirtualClock(private val zone: ZoneId = ZoneId.systemDefault()) : Clock() { + override fun getZone(): ZoneId = zone + + override fun withZone(zone: ZoneId): Clock = VirtualClock(zone) + + override fun instant(): Instant = Instant.ofEpochMilli(millis()) + + override fun millis(): Long = _time + + override fun toString(): String = "SimulationCoroutineDispatcher.VirtualClock[time=$_time]" + } + + /** + * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled + * in the future. + */ + private class CancellableContinuationRunnable<T>( + @JvmField val continuation: CancellableContinuation<T>, + private val block: CancellableContinuation<T>.() -> Unit + ) : Runnable { + override fun run() = continuation.block() + } + + /** + * A Runnable for our event loop that represents a task to perform at a time. + */ + private class TimedRunnable( + @JvmField val runnable: Runnable, + private val count: Long = 0, + @JvmField val time: Long = 0 + ) : Comparable<TimedRunnable>, Runnable by runnable { + override fun compareTo(other: TimedRunnable) = if (time == other.time) { + count.compareTo(other.count) + } else { + time.compareTo(other.time) + } + + override fun toString() = "TimedRunnable[time=$time, run=$runnable]" + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt new file mode 100644 index 00000000..1da7f0fa --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.core + +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * A scope which provides detailed control over the execution of coroutines for simulations. + */ +public interface SimulationCoroutineScope : CoroutineScope, SimulationController + +private class SimulationCoroutineScopeImpl( + override val coroutineContext: CoroutineContext +) : + SimulationCoroutineScope, + SimulationController by coroutineContext.simulationController + +/** + * A scope which provides detailed control over the execution of coroutines for simulations. + * + * If the provided context does not provide a [ContinuationInterceptor] (Dispatcher) or [CoroutineExceptionHandler], the + * scope adds [SimulationCoroutineDispatcher] automatically. + */ +@Suppress("FunctionName") +public fun SimulationCoroutineScope(context: CoroutineContext = EmptyCoroutineContext): SimulationCoroutineScope { + var safeContext = context + if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher() + return SimulationCoroutineScopeImpl(safeContext) +} + +private inline val CoroutineContext.simulationController: SimulationController + get() { + val handler = this[ContinuationInterceptor] + return handler as? SimulationController ?: throw IllegalArgumentException( + "SimulationCoroutineScope requires a SimulationController such as SimulatorCoroutineDispatcher as " + + "the ContinuationInterceptor (Dispatcher)" + ) + } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index f2eea97c..beda3eaa 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -24,12 +24,10 @@ package org.opendc.simulator.resources import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.test.runBlockingTest -import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.simulator.core.SimulationCoroutineScope +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.utils.TimerScheduler import org.openjdk.jmh.annotations.* -import java.time.Clock import java.util.concurrent.TimeUnit @State(Scope.Thread) @@ -38,15 +36,13 @@ import java.util.concurrent.TimeUnit @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) @OptIn(ExperimentalCoroutinesApi::class) class SimResourceBenchmarks { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock + private lateinit var scope: SimulationCoroutineScope private lateinit var scheduler: TimerScheduler<Any> @Setup fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - scheduler = TimerScheduler(scope.coroutineContext, clock) + scope = SimulationCoroutineScope() + scheduler = TimerScheduler(scope.coroutineContext, scope.clock) } @State(Scope.Thread) @@ -61,38 +57,38 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSource(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val provider = SimResourceSource(4200.0, clock, scheduler) - return@runBlockingTest provider.consume(state.consumers[0]) + return@runBlockingSimulation provider.consume(state.consumers[0]) } } @Benchmark fun benchmarkForwardOverhead(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val provider = SimResourceSource(4200.0, clock, scheduler) val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) - return@runBlockingTest forwarder.consume(state.consumers[0]) + return@runBlockingSimulation forwarder.consume(state.consumers[0]) } } @Benchmark fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val switch = SimResourceSwitchMaxMin(clock) switch.addInput(SimResourceSource(3000.0, clock, scheduler)) switch.addInput(SimResourceSource(3000.0, clock, scheduler)) val provider = switch.addOutput(3500.0) - return@runBlockingTest provider.consume(state.consumers[0]) + return@runBlockingSimulation provider.consume(state.consumers[0]) } } @Benchmark fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val switch = SimResourceSwitchMaxMin(clock) switch.addInput(SimResourceSource(3000.0, clock, scheduler)) @@ -109,20 +105,20 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() switch.addInput(SimResourceSource(3000.0, clock, scheduler)) switch.addInput(SimResourceSource(3000.0, clock, scheduler)) val provider = switch.addOutput(3500.0) - return@runBlockingTest provider.consume(state.consumers[0]) + return@runBlockingSimulation provider.consume(state.consumers[0]) } } @Benchmark fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { - return scope.runBlockingTest { + return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() switch.addInput(SimResourceSource(3000.0, clock, scheduler)) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index bf8c6d1f..e272abb8 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -26,14 +26,13 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler /** @@ -42,8 +41,7 @@ import org.opendc.utils.TimerScheduler @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceAggregatorMaxMinTest { @Test - fun testSingleCapacity() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testSingleCapacity() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) @@ -65,7 +63,7 @@ internal class SimResourceAggregatorMaxMinTest { yield() assertAll( - { assertEquals(1000, currentTime) }, + { assertEquals(1000, clock.millis()) }, { assertEquals(listOf(0.0, 0.5, 0.0), usage) } ) } finally { @@ -74,8 +72,7 @@ internal class SimResourceAggregatorMaxMinTest { } @Test - fun testDoubleCapacity() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testDoubleCapacity() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) @@ -93,7 +90,7 @@ internal class SimResourceAggregatorMaxMinTest { aggregator.output.consume(adapter) yield() assertAll( - { assertEquals(1000, currentTime) }, + { assertEquals(1000, clock.millis()) }, { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { @@ -102,8 +99,7 @@ internal class SimResourceAggregatorMaxMinTest { } @Test - fun testOvercommit() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testOvercommit() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) @@ -121,7 +117,7 @@ internal class SimResourceAggregatorMaxMinTest { try { aggregator.output.consume(consumer) yield() - assertEquals(1000, currentTime) + assertEquals(1000, clock.millis()) verify(exactly = 2) { consumer.onNext(any()) } } finally { @@ -130,8 +126,7 @@ internal class SimResourceAggregatorMaxMinTest { } @Test - fun testException() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testException() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) @@ -156,8 +151,7 @@ internal class SimResourceAggregatorMaxMinTest { } @Test - fun testAdjustCapacity() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testAdjustCapacity() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) @@ -175,15 +169,14 @@ internal class SimResourceAggregatorMaxMinTest { sources[0].capacity = 0.5 } yield() - assertEquals(2334, currentTime) + assertEquals(2334, clock.millis()) } finally { aggregator.output.close() } } @Test - fun testFailOverCapacity() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testFailOverCapacity() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) @@ -201,7 +194,7 @@ internal class SimResourceAggregatorMaxMinTest { sources[0].capacity = 0.5 } yield() - assertEquals(1000, currentTime) + assertEquals(1000, clock.millis()) } finally { aggregator.output.close() } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 030a0f6b..be909556 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -24,9 +24,8 @@ package org.opendc.simulator.resources import io.mockk.* import kotlinx.coroutines.* -import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* -import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.simulator.core.runBlockingSimulation /** * A test suite for the [SimAbstractResourceContext] class. @@ -34,9 +33,7 @@ import org.opendc.simulator.utils.DelayControllerClockAdapter @OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { @Test - fun testFlushWithoutCommand() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) - + fun testFlushWithoutCommand() = runBlockingSimulation { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit @@ -50,9 +47,7 @@ class SimResourceContextTest { } @Test - fun testIntermediateFlush() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) - + fun testIntermediateFlush() = runBlockingSimulation { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit @@ -70,9 +65,7 @@ class SimResourceContextTest { } @Test - fun testIntermediateFlushIdle() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) - + fun testIntermediateFlushIdle() = runBlockingSimulation { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit @@ -95,9 +88,7 @@ class SimResourceContextTest { } @Test - fun testDoubleStart() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) - + fun testDoubleStart() = runBlockingSimulation { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index dbba6160..39f74481 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -27,12 +27,11 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler /** @@ -41,8 +40,7 @@ import org.opendc.utils.TimerScheduler @OptIn(ExperimentalCoroutinesApi::class) class SimResourceSourceTest { @Test - fun testSpeed() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testSpeed() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -66,8 +64,7 @@ class SimResourceSourceTest { } @Test - fun testAdjustCapacity() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testAdjustCapacity() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(1.0, clock, scheduler) @@ -79,7 +76,7 @@ class SimResourceSourceTest { delay(1000) provider.capacity = 0.5 } - assertEquals(3000, currentTime) + assertEquals(3000, clock.millis()) verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } } finally { scheduler.close() @@ -88,8 +85,7 @@ class SimResourceSourceTest { } @Test - fun testSpeedLimit() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testSpeedLimit() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -117,8 +113,7 @@ class SimResourceSourceTest { * [SimResourceConsumer.onNext]. */ @Test - fun testIntermediateInterrupt() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testIntermediateInterrupt() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -142,8 +137,7 @@ class SimResourceSourceTest { } @Test - fun testInterrupt() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testInterrupt() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -173,7 +167,7 @@ class SimResourceSourceTest { } provider.consume(consumer) - assertEquals(0, currentTime) + assertEquals(0, clock.millis()) } finally { scheduler.close() provider.close() @@ -181,8 +175,7 @@ class SimResourceSourceTest { } @Test - fun testFailure() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testFailure() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -202,8 +195,7 @@ class SimResourceSourceTest { } @Test - fun testExceptionPropagationOnNext() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testExceptionPropagationOnNext() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -224,8 +216,7 @@ class SimResourceSourceTest { } @Test - fun testConcurrentConsumption() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testConcurrentConsumption() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -249,8 +240,7 @@ class SimResourceSourceTest { } @Test - fun testClosedConsumption() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testClosedConsumption() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -272,8 +262,7 @@ class SimResourceSourceTest { } @Test - fun testCloseDuringConsumption() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testCloseDuringConsumption() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -288,7 +277,7 @@ class SimResourceSourceTest { delay(500) provider.close() - assertEquals(500, currentTime) + assertEquals(500, clock.millis()) } finally { scheduler.close() provider.close() @@ -296,8 +285,7 @@ class SimResourceSourceTest { } @Test - fun testIdle() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testIdle() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -310,7 +298,7 @@ class SimResourceSourceTest { try { provider.consume(consumer) - assertEquals(500, currentTime) + assertEquals(500, clock.millis()) } finally { scheduler.close() provider.close() @@ -320,8 +308,7 @@ class SimResourceSourceTest { @Test fun testInfiniteSleep() { assertThrows<IllegalStateException> { - runBlockingTest { - val clock = DelayControllerClockAdapter(this) + runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) @@ -342,8 +329,7 @@ class SimResourceSourceTest { } @Test - fun testIncorrectDeadline() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testIncorrectDeadline() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, clock, scheduler) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 9a40edc4..f7d17867 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -25,15 +25,14 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler /** @@ -45,8 +44,7 @@ internal class SimResourceSwitchExclusiveTest { * Test a trace workload. */ @Test - fun testTrace() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testTrace() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val speed = mutableListOf<Double>() @@ -80,7 +78,7 @@ internal class SimResourceSwitchExclusiveTest { assertAll( { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, currentTime) { "Took enough time" } } + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } ) } @@ -88,8 +86,7 @@ internal class SimResourceSwitchExclusiveTest { * Test runtime workload on hypervisor. */ @Test - fun testRuntimeWorkload() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testRuntimeWorkload() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 @@ -109,15 +106,14 @@ internal class SimResourceSwitchExclusiveTest { } finally { provider.close() } - assertEquals(duration, currentTime) { "Took enough time" } + assertEquals(duration, clock.millis()) { "Took enough time" } } /** * Test two workloads running sequentially. */ @Test - fun testTwoWorkloads() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testTwoWorkloads() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 @@ -152,15 +148,14 @@ internal class SimResourceSwitchExclusiveTest { } finally { provider.close() } - assertEquals(duration * 2, currentTime) { "Took enough time" } + assertEquals(duration * 2, clock.millis()) { "Took enough time" } } /** * Test concurrent workloads on the machine. */ @Test - fun testConcurrentWorkloadFails() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testConcurrentWorkloadFails() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val duration = 5 * 60L * 1000 diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 5f4fd187..7416f277 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -27,12 +27,11 @@ import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler /** @@ -41,8 +40,7 @@ import org.opendc.utils.TimerScheduler @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceSwitchMaxMinTest { @Test - fun testSmoke() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testSmoke() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val switch = SimResourceSwitchMaxMin(clock) @@ -67,8 +65,7 @@ internal class SimResourceSwitchMaxMinTest { * Test overcommitting of resources via the hypervisor with a single VM. */ @Test - fun testOvercommittedSingle() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testOvercommittedSingle() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { @@ -118,7 +115,7 @@ internal class SimResourceSwitchMaxMinTest { { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, currentTime) } + { assertEquals(1200000, clock.millis()) } ) } @@ -126,8 +123,7 @@ internal class SimResourceSwitchMaxMinTest { * Test overcommitting of resources via the hypervisor with two VMs. */ @Test - fun testOvercommittedDual() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testOvercommittedDual() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { @@ -191,7 +187,7 @@ internal class SimResourceSwitchMaxMinTest { { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, - { assertEquals(1200000, currentTime) } + { assertEquals(1200000, clock.millis()) } ) } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 38598f6b..d2ad73bc 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -27,12 +27,11 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler /** @@ -41,9 +40,8 @@ import org.opendc.utils.TimerScheduler @OptIn(ExperimentalCoroutinesApi::class) internal class SimResourceTransformerTest { @Test - fun testExitImmediately() = runBlockingTest { + fun testExitImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) val source = SimResourceSource(2000.0, clock, scheduler) @@ -63,9 +61,8 @@ internal class SimResourceTransformerTest { } @Test - fun testExit() = runBlockingTest { + fun testExit() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) val source = SimResourceSource(2000.0, clock, scheduler) @@ -91,7 +88,7 @@ internal class SimResourceTransformerTest { } @Test - fun testState() = runBlockingTest { + fun testState() = runBlockingSimulation { val forwarder = SimResourceForwarder() val consumer = object : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Exit @@ -112,7 +109,7 @@ internal class SimResourceTransformerTest { } @Test - fun testCancelPendingDelegate() = runBlockingTest { + fun testCancelPendingDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -125,9 +122,8 @@ internal class SimResourceTransformerTest { } @Test - fun testCancelStartedDelegate() = runBlockingTest { + fun testCancelStartedDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) val source = SimResourceSource(2000.0, clock, scheduler) @@ -145,9 +141,8 @@ internal class SimResourceTransformerTest { } @Test - fun testCancelPropagation() = runBlockingTest { + fun testCancelPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) val source = SimResourceSource(2000.0, clock, scheduler) @@ -165,9 +160,8 @@ internal class SimResourceTransformerTest { } @Test - fun testExitPropagation() = runBlockingTest { + fun testExitPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder(isCoupled = true) - val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) val source = SimResourceSource(2000.0, clock, scheduler) @@ -182,9 +176,8 @@ internal class SimResourceTransformerTest { } @Test - fun testAdjustCapacity() = runBlockingTest { + fun testAdjustCapacity() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) val source = SimResourceSource(1.0, clock, scheduler) @@ -197,14 +190,13 @@ internal class SimResourceTransformerTest { source.capacity = 0.5 } - assertEquals(3000, currentTime) + assertEquals(3000, clock.millis()) verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } } @Test - fun testTransformExit() = runBlockingTest { + fun testTransformExit() = runBlockingSimulation { val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } - val clock = DelayControllerClockAdapter(this) val scheduler = TimerScheduler<Any>(coroutineContext, clock) val source = SimResourceSource(1.0, clock, scheduler) @@ -212,7 +204,7 @@ internal class SimResourceTransformerTest { source.startConsumer(forwarder) forwarder.consume(consumer) - assertEquals(0, currentTime) + assertEquals(0, clock.millis()) verify(exactly = 1) { consumer.onNext(any()) } } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index 4d6b19ee..bf58b1b6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -23,11 +23,10 @@ package org.opendc.simulator.resources import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler /** @@ -36,8 +35,7 @@ import org.opendc.utils.TimerScheduler @OptIn(ExperimentalCoroutinesApi::class) internal class SimWorkConsumerTest { @Test - fun testSmoke() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testSmoke() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(1.0, clock, scheduler) @@ -45,15 +43,14 @@ internal class SimWorkConsumerTest { try { provider.consume(consumer) - assertEquals(1000, currentTime) + assertEquals(1000, clock.millis()) } finally { provider.close() } } @Test - fun testUtilization() = runBlockingTest { - val clock = DelayControllerClockAdapter(this) + fun testUtilization() = runBlockingSimulation { val scheduler = TimerScheduler<Any>(coroutineContext, clock) val provider = SimResourceSource(1.0, clock, scheduler) @@ -61,7 +58,7 @@ internal class SimWorkConsumerTest { try { provider.consume(consumer) - assertEquals(2000, currentTime) + assertEquals(2000, clock.millis()) } finally { provider.close() } |
