diff options
25 files changed, 410 insertions, 213 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 6d87e444..68667a8c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -105,6 +105,7 @@ public class SimHost( * The hypervisor to run multiple workloads. */ public val hypervisor: SimHypervisor = hypervisor.create( + scope.coroutineContext, clock, object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index bae31921..15714aca 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -34,7 +34,8 @@ import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.utils.TimerScheduler +import org.opendc.simulator.resources.SimResourceScheduler +import org.opendc.simulator.resources.SimResourceSchedulerTrampoline import org.openjdk.jmh.annotations.* import java.util.concurrent.TimeUnit @@ -45,13 +46,13 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: TimerScheduler<Any> + private lateinit var scheduler: SimResourceScheduler private lateinit var machineModel: SimMachineModel @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = TimerScheduler(scope.coroutineContext, scope.clock) + scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -61,7 +62,7 @@ class SimMachineBenchmarks { ) } - @State(Scope.Thread) + @State(Scope.Benchmark) class Workload { lateinit var trace: Sequence<SimTraceWorkload.Fragment> @@ -120,7 +121,7 @@ class SimMachineBenchmarks { coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor() + val hypervisor = SimFairShareHypervisor(scheduler) launch { machine.run(hypervisor) } @@ -142,12 +143,12 @@ class SimMachineBenchmarks { coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor() + val hypervisor = SimFairShareHypervisor(scheduler) launch { machine.run(hypervisor) } coroutineScope { - repeat(2) { i -> + repeat(2) { val vm = hypervisor.createMachine(machineModel) launch { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 09ee601e..27ebba21 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* import org.opendc.simulator.compute.cpufreq.ScalingDriver import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.ProcessingUnit @@ -60,7 +59,7 @@ public class SimBareMetalMachine( /** * The [TimerScheduler] to use for scheduling the interrupts. */ - private val scheduler = TimerScheduler<Any>(this.context, clock) + private val scheduler = SimResourceSchedulerTrampoline(this.context, clock) override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) } @@ -96,7 +95,6 @@ public class SimBareMetalMachine( override fun close() { super.close() - scheduler.close() scope.cancel() } @@ -107,7 +105,7 @@ public class SimBareMetalMachine( /** * The actual resource supporting the processing unit. */ - private val source = SimResourceSource(model.frequency, clock, scheduler) + private val source = SimResourceSource(model.frequency, scheduler) override val speed: Double get() = source.speed diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index fa677de9..11aec2de 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -31,13 +31,13 @@ import org.opendc.simulator.resources.* * * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { +public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchMaxMin( - ctx.clock, + scheduler, object : SimResourceSwitchMaxMin.Listener { override fun onSliceFinish( switch: SimResourceSwitchMaxMin, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt index 02eb6ad0..2ab3ea09 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt @@ -22,11 +22,17 @@ package org.opendc.simulator.compute +import org.opendc.simulator.resources.SimResourceSchedulerTrampoline +import java.time.Clock +import kotlin.coroutines.CoroutineContext + /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. */ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" - override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimFairShareHypervisor(listener) + override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { + return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener) + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt index a5b4526b..b66020f4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt @@ -22,6 +22,9 @@ package org.opendc.simulator.compute +import java.time.Clock +import kotlin.coroutines.CoroutineContext + /** * A service provider interface for constructing a [SimHypervisor]. */ @@ -37,5 +40,5 @@ public interface SimHypervisorProvider { /** * Create a [SimHypervisor] instance with the specified [listener]. */ - public fun create(listener: SimHypervisor.Listener? = null): SimHypervisor + public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index e2044d05..83b924d7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -22,11 +22,16 @@ package org.opendc.simulator.compute +import java.time.Clock +import kotlin.coroutines.CoroutineContext + /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. */ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor() + override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { + return SimSpaceSharedHypervisor() + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index a067dd2e..8886caa7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -39,6 +39,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceSchedulerTrampoline /** * Test suite for the [SimHypervisor] class. @@ -93,7 +94,7 @@ internal class SimHypervisorTest { ) val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(listener) + val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) launch { machine.run(hypervisor) @@ -167,7 +168,7 @@ internal class SimHypervisorTest { coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(listener) + val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) launch { machine.run(hypervisor) diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index beda3eaa..cd5f33bd 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -26,7 +26,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.utils.TimerScheduler +import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.openjdk.jmh.annotations.* import java.util.concurrent.TimeUnit @@ -37,67 +37,76 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimResourceBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: TimerScheduler<Any> + private lateinit var scheduler: SimResourceScheduler @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = TimerScheduler(scope.coroutineContext, scope.clock) + scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) } @State(Scope.Thread) class Workload { - lateinit var consumers: Array<SimResourceConsumer> + lateinit var trace: Sequence<SimTraceConsumer.Fragment> @Setup fun setUp() { - consumers = Array(3) { createSimpleConsumer() } + trace = sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0), + SimTraceConsumer.Fragment(1000, 400.0), + SimTraceConsumer.Fragment(1000, 100.0), + SimTraceConsumer.Fragment(1000, 3000.0), + SimTraceConsumer.Fragment(1000, 4500.0), + ) } } @Benchmark fun benchmarkSource(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, clock, scheduler) - return@runBlockingSimulation provider.consume(state.consumers[0]) + val provider = SimResourceSource(4200.0, scheduler) + return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @Benchmark fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, clock, scheduler) + val provider = SimResourceSource(4200.0, scheduler) val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) - return@runBlockingSimulation forwarder.consume(state.consumers[0]) + return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace)) } } @Benchmark fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(clock) + val switch = SimResourceSwitchMaxMin(scheduler) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) val provider = switch.addOutput(3500.0) - return@runBlockingSimulation provider.consume(state.consumers[0]) + return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @Benchmark fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(clock) + val switch = SimResourceSwitchMaxMin(scheduler) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) repeat(3) { i -> launch { val provider = switch.addOutput(3500.0) - provider.consume(state.consumers[i]) + provider.consume(SimTraceConsumer(state.trace)) } } } @@ -108,11 +117,11 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) val provider = switch.addOutput(3500.0) - return@runBlockingSimulation provider.consume(state.consumers[0]) + return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -121,13 +130,13 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) - repeat(2) { i -> + repeat(2) { launch { val provider = switch.addOutput(3500.0) - provider.consume(state.consumers[i]) + provider.consume(SimTraceConsumer(state.trace)) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 1bcaf45f..6ae04f27 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -22,12 +22,10 @@ package org.opendc.simulator.resources -import java.time.Clock - /** * Abstract implementation of [SimResourceAggregator]. */ -public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { +public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator { /** * This method is invoked when the resource consumer consumes resources. */ @@ -77,7 +75,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : protected val outputContext: SimResourceContext get() = context - private val context = object : SimAbstractResourceContext(0.0, clock, _output) { + private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) { override val remainingWork: Double get() { val now = clock.millis() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index d2f585b1..c03bfad5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -31,9 +31,16 @@ import kotlin.math.min */ public abstract class SimAbstractResourceContext( initialCapacity: Double, - override val clock: Clock, + private val scheduler: SimResourceScheduler, private val consumer: SimResourceConsumer -) : SimResourceContext { +) : SimResourceContext, SimResourceFlushable { + + /** + * The clock of the context. + */ + public override val clock: Clock + get() = scheduler.clock + /** * The capacity of the resource. */ @@ -143,21 +150,12 @@ public abstract class SimAbstractResourceContext( flush(isIntermediate = true) doStop() - } catch (cause: Throwable) { - doFail(cause) } finally { isProcessing = false } } - /** - * Flush the current active resource consumption. - * - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun flush(isIntermediate: Boolean = false) { + override fun flush(isIntermediate: Boolean) { // Flush is no-op when the consumer is finished or not yet started if (state != SimResourceState.Active) { return @@ -226,7 +224,7 @@ public abstract class SimAbstractResourceContext( return } - flush() + scheduler.schedule(this, isIntermediate = false) } override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" @@ -234,7 +232,7 @@ public abstract class SimAbstractResourceContext( /** * A flag to indicate that the resource is currently processing a command. */ - protected var isProcessing: Boolean = false + private var isProcessing: Boolean = false /** * The current command that is being processed. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 5d550ad8..5665abd1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -22,12 +22,10 @@ package org.opendc.simulator.resources -import java.time.Clock - /** * A [SimResourceAggregator] that distributes the load equally across the input resources. */ -public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { +public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) { private val consumers = mutableListOf<Input>() override fun doConsume(work: Double, limit: Double, deadline: Long) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 8128c98b..a76cb1e3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources -import java.time.Clock import kotlin.math.max import kotlin.math.min @@ -31,7 +30,7 @@ import kotlin.math.min */ public class SimResourceDistributorMaxMin( override val input: SimResourceProvider, - private val clock: Clock, + private val scheduler: SimResourceScheduler, private val listener: Listener? = null ) : SimResourceDistributor { override val outputs: Set<SimResourceProvider> @@ -220,7 +219,7 @@ public class SimResourceDistributorMaxMin( } } - assert(deadline >= clock.millis()) { "Deadline already passed" } + assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" } this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork @@ -337,7 +336,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { + ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> { /** * The current command that is processed by the vCPU. */ @@ -402,6 +401,8 @@ public class SimResourceDistributorMaxMin( } } + private var isProcessing: Boolean = false + override fun interrupt() { // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead // to infinite recursion. @@ -409,10 +410,16 @@ public class SimResourceDistributorMaxMin( return } - super.interrupt() + try { + isProcessing = false - // Force the scheduler to re-schedule - schedule() + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } finally { + isProcessing = true + } } override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt index 8d2587b1..f6a1a42e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt @@ -22,22 +22,16 @@ package org.opendc.simulator.resources -import org.opendc.simulator.resources.consumer.SimTraceConsumer - /** - * Helper function to create simple consumer workload. + * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer. */ -fun createSimpleConsumer(): SimResourceConsumer { - return SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(1000, 28.0), - SimTraceConsumer.Fragment(1000, 3500.0), - SimTraceConsumer.Fragment(1000, 0.0), - SimTraceConsumer.Fragment(1000, 183.0), - SimTraceConsumer.Fragment(1000, 400.0), - SimTraceConsumer.Fragment(1000, 100.0), - SimTraceConsumer.Fragment(1000, 3000.0), - SimTraceConsumer.Fragment(1000, 4500.0), - ), - ) +public interface SimResourceFlushable { + /** + * Flush the current active resource consumption. + * + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun flush(isIntermediate: Boolean) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt new file mode 100644 index 00000000..a228c47b --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource + * providers and consumers. + * + * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more + * efficiently, reducing the overall work needed per update. + */ +public interface SimResourceScheduler { + /** + * The [Clock] associated with this scheduler. + */ + public val clock: Clock + + /** + * Schedule a direct interrupt for the resource context represented by [flushable]. + * + * @param flushable The resource context that needs to be flushed. + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false) + + /** + * Schedule an interrupt in the future for the resource context represented by [flushable]. + * + * This method will override earlier calls to this method for the same [flushable]. + * + * @param flushable The resource context that needs to be flushed. + * @param timestamp The timestamp when the interrupt should happen. + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false) + + /** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ + public fun batch(block: () -> Unit) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt new file mode 100644 index 00000000..cdbb4a6c --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.utils.TimerScheduler +import java.time.Clock +import java.util.ArrayDeque +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after. + * + * @param clock The virtual simulation clock. + */ +public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler { + /** + * The [TimerScheduler] to actually schedule the interrupts. + */ + private val timers = TimerScheduler<Any>(context, clock) + + /** + * A flag to indicate that an interrupt is currently running already. + */ + private var isRunning: Boolean = false + + /** + * The queue of resources to be flushed. + */ + private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>() + + override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) { + queue.add(flushable to isIntermediate) + + if (isRunning) { + return + } + + flush() + } + + override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) { + timers.startSingleTimerTo(flushable, timestamp) { + schedule(flushable, isIntermediate) + } + } + + override fun batch(block: () -> Unit) { + val wasAlreadyRunning = isRunning + try { + isRunning = true + block() + } finally { + if (!wasAlreadyRunning) { + isRunning = false + } + } + } + + /** + * Flush the scheduled queue. + */ + private fun flush() { + val visited = mutableSetOf<SimResourceFlushable>() + try { + isRunning = true + while (queue.isNotEmpty()) { + val (flushable, isIntermediate) = queue.poll() + flushable.flush(isIntermediate) + visited.add(flushable) + } + } finally { + isRunning = false + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index fe569096..3277b889 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import org.opendc.utils.TimerScheduler -import java.time.Clock import kotlin.math.ceil import kotlin.math.min @@ -31,13 +29,11 @@ import kotlin.math.min * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. * * @param initialCapacity The initial capacity of the resource. - * @param clock The virtual clock to track simulation time. * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( initialCapacity: Double, - private val clock: Clock, - private val scheduler: TimerScheduler<Any> + private val scheduler: SimResourceScheduler ) : SimResourceProvider { /** * The current processing speed of the resource. @@ -96,22 +92,21 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) { override fun onIdle(deadline: Long) { // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { - scheduler.startSingleTimerTo(this, deadline) { flush() } + scheduler.schedule(this, deadline) } } override fun onConsume(work: Double, limit: Double, deadline: Long) { val until = min(deadline, clock.millis() + getDuration(work, speed)) - - scheduler.startSingleTimerTo(this, until, ::flush) + scheduler.schedule(this, until) } override fun onFinish() { - scheduler.cancel(this) + cancel() ctx = null diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index c796c251..5dc1e68d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -23,14 +23,13 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* -import java.time.Clock /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ public class SimResourceSwitchMaxMin( - clock: Clock, + scheduler: SimResourceScheduler, private val listener: Listener? = null ) : SimResourceSwitch { private val _outputs = mutableSetOf<SimResourceProvider>() @@ -49,13 +48,13 @@ public class SimResourceSwitchMaxMin( /** * The aggregator to aggregate the resources. */ - private val aggregator = SimResourceAggregatorMaxMin(clock) + private val aggregator = SimResourceAggregatorMaxMin(scheduler) /** * The distributor to distribute the aggregated resources. */ private val distributor = SimResourceDistributorMaxMin( - aggregator.output, clock, + aggregator.output, scheduler, object : SimResourceDistributorMaxMin.Listener { override fun onSliceFinish( switch: SimResourceDistributor, diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index e78bcdac..2b32300e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * Test suite for the [SimResourceAggregatorMaxMin] class. @@ -42,19 +41,19 @@ import org.opendc.utils.TimerScheduler internal class SimResourceAggregatorMaxMinTest { @Test fun testSingleCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val forwarder = SimResourceForwarder() val sources = listOf( forwarder, - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) val usage = mutableListOf<Double>() - val source = SimResourceSource(1.0, clock, scheduler) + val source = SimResourceSource(1.0, scheduler) val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) source.startConsumer(adapter) @@ -73,12 +72,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testDoubleCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -100,12 +99,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testOvercommit() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -127,12 +126,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testException() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -152,12 +151,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -177,12 +176,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testFailOverCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 8c15ec71..2e2d6588 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -34,24 +34,26 @@ import org.opendc.simulator.core.runBlockingSimulation class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} override fun onFinish() {} } - context.flush() + context.flush(isIntermediate = false) } @Test fun testIntermediateFlush() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -66,10 +68,11 @@ class SimResourceContextTest { @Test fun testIntermediateFlushIdle() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -89,10 +92,11 @@ class SimResourceContextTest { @Test fun testDoubleStart() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -101,4 +105,43 @@ class SimResourceContextTest { context.start() assertThrows<IllegalStateException> { context.start() } } + + @Test + fun testIdempodentCapacityChange() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { + override fun onIdle(deadline: Long) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + override fun onFinish() {} + } + + context.start() + context.capacity = 4200.0 + + verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) } + } + + @Test + fun testFailureNoInfiniteLoop() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Exit + every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") + every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") + + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { + override fun onIdle(deadline: Long) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + override fun onFinish() {} + } + + context.start() + + delay(1) + + verify(exactly = 1) { consumer.onFailure(any(), any()) } + } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 361a1516..5e86088d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * A test suite for the [SimResourceSource] class. @@ -41,9 +40,9 @@ import org.opendc.utils.TimerScheduler class SimResourceSourceTest { @Test fun testSpeed() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -58,15 +57,14 @@ class SimResourceSourceTest { assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { - scheduler.close() provider.close() } } @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val provider = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -79,16 +77,15 @@ class SimResourceSourceTest { assertEquals(3000, clock.millis()) verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } finally { - scheduler.close() provider.close() } } @Test fun testSpeedLimit() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -103,7 +100,6 @@ class SimResourceSourceTest { assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { - scheduler.close() provider.close() } } @@ -114,9 +110,9 @@ class SimResourceSourceTest { */ @Test fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = object : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand { @@ -131,16 +127,15 @@ class SimResourceSourceTest { try { provider.consume(consumer) } finally { - scheduler.close() provider.close() } } @Test fun testInterrupt() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) lateinit var resCtx: SimResourceContext val consumer = object : SimResourceConsumer { @@ -173,16 +168,15 @@ class SimResourceSourceTest { assertEquals(0, clock.millis()) } finally { - scheduler.close() provider.close() } } @Test fun testFailure() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } @@ -193,16 +187,15 @@ class SimResourceSourceTest { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } @Test fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -214,16 +207,15 @@ class SimResourceSourceTest { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } @Test fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -238,16 +230,15 @@ class SimResourceSourceTest { } } } finally { - scheduler.close() provider.close() } } @Test fun testClosedConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -260,16 +251,15 @@ class SimResourceSourceTest { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } @Test fun testCloseDuringConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -283,16 +273,15 @@ class SimResourceSourceTest { assertEquals(500, clock.millis()) } finally { - scheduler.close() provider.close() } } @Test fun testIdle() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -304,7 +293,6 @@ class SimResourceSourceTest { assertEquals(500, clock.millis()) } finally { - scheduler.close() provider.close() } } @@ -313,9 +301,9 @@ class SimResourceSourceTest { fun testInfiniteSleep() { assertThrows<IllegalStateException> { runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -325,7 +313,6 @@ class SimResourceSourceTest { try { provider.consume(consumer) } finally { - scheduler.close() provider.close() } } @@ -334,9 +321,9 @@ class SimResourceSourceTest { @Test fun testIncorrectDeadline() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -348,7 +335,6 @@ class SimResourceSourceTest { assertThrows<IllegalArgumentException> { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 1b1f7790..32b6d8ad 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.utils.TimerScheduler /** * Test suite for the [SimResourceSwitchExclusive] class. @@ -45,7 +44,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTrace() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val speed = mutableListOf<Double>() @@ -61,7 +60,7 @@ internal class SimResourceSwitchExclusiveTest { ) val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) val forwarder = SimResourceForwarder() val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) source.startConsumer(adapter) @@ -87,14 +86,14 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) switch.addInput(source) @@ -114,7 +113,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = object : SimResourceConsumer { @@ -138,7 +137,7 @@ internal class SimResourceSwitchExclusiveTest { } val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) switch.addInput(source) @@ -159,14 +158,14 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) switch.addInput(source) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 7416f277..e7dec172 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.utils.TimerScheduler /** * Test suite for the [SimResourceSwitch] implementations @@ -41,10 +40,10 @@ import org.opendc.utils.TimerScheduler internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin(clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val switch = SimResourceSwitchMaxMin(scheduler) - val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) } + val sources = List(2) { SimResourceSource(2000.0, scheduler) } sources.forEach { switch.addInput(it) } val provider = switch.addOutput(1000.0) @@ -57,7 +56,6 @@ internal class SimResourceSwitchMaxMinTest { yield() } finally { switch.close() - scheduler.close() } } @@ -66,7 +64,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L @@ -99,16 +97,15 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(clock, listener) + val switch = SimResourceSwitchMaxMin(scheduler, listener) val provider = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(3200.0, clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, scheduler)) provider.consume(workload) yield() } finally { switch.close() - scheduler.close() } assertAll( @@ -124,7 +121,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L @@ -166,12 +163,12 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(clock, listener) + val switch = SimResourceSwitchMaxMin(scheduler, listener) val providerA = switch.addOutput(3200.0) val providerB = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(3200.0, clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, scheduler)) coroutineScope { launch { providerA.consume(workloadA) } @@ -181,7 +178,6 @@ internal class SimResourceSwitchMaxMinTest { yield() } finally { switch.close() - scheduler.close() } assertAll( { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index e3ca5845..880e1755 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * A test suite for the [SimResourceTransformer] class. @@ -42,8 +41,8 @@ internal class SimResourceTransformerTest { @Test fun testExitImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) launch { source.consume(forwarder) @@ -57,14 +56,13 @@ internal class SimResourceTransformerTest { }) forwarder.close() - scheduler.close() } @Test fun testExit() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) launch { source.consume(forwarder) @@ -124,8 +122,8 @@ internal class SimResourceTransformerTest { @Test fun testCancelStartedDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) @@ -143,8 +141,8 @@ internal class SimResourceTransformerTest { @Test fun testCancelPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) @@ -162,8 +160,8 @@ internal class SimResourceTransformerTest { @Test fun testExitPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Exit @@ -178,8 +176,8 @@ internal class SimResourceTransformerTest { @Test fun testAdjustCapacity() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) source.startConsumer(forwarder) @@ -197,8 +195,8 @@ internal class SimResourceTransformerTest { @Test fun testTransformExit() = runBlockingSimulation { val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val source = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) source.startConsumer(forwarder) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index bf58b1b6..ac8b5814 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * A test suite for the [SimWorkConsumer] class. @@ -36,8 +35,8 @@ import org.opendc.utils.TimerScheduler internal class SimWorkConsumerTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 1.0) @@ -51,8 +50,8 @@ internal class SimWorkConsumerTest { @Test fun testUtilization() = runBlockingSimulation { - val scheduler = TimerScheduler<Any>(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 0.5) |
