From 9e5e830e15b74f040708e98c09ea41cd96d13871 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 27 May 2021 16:34:06 +0200 Subject: simulator: Centralize resource logic in SimResourceInterpreter This change introduces the SimResourceInterpreter which centralizes the logic for scheduling and interpreting the communication between resource consumer and provider. This approach offers better performance due to avoiding invalidating the state of the resource context when not necessary. Benchmarks show in the best case a 5x performance improvement and at worst a 2x improvement. --- .../simulator/compute/SimMachineBenchmarks.kt | 21 +- .../simulator/compute/SimAbstractHypervisor.kt | 13 +- .../opendc/simulator/compute/SimAbstractMachine.kt | 67 ++-- .../simulator/compute/SimBareMetalMachine.kt | 32 +- .../simulator/compute/SimFairShareHypervisor.kt | 8 +- .../compute/SimFairShareHypervisorProvider.kt | 8 +- .../simulator/compute/SimHypervisorProvider.kt | 5 +- .../simulator/compute/SimSpaceSharedHypervisor.kt | 6 +- .../compute/SimSpaceSharedHypervisorProvider.kt | 7 +- .../opendc/simulator/compute/SimHypervisorTest.kt | 41 ++- .../org/opendc/simulator/compute/SimMachineTest.kt | 11 +- .../compute/SimSpaceSharedHypervisorTest.kt | 31 +- .../simulator/resources/SimResourceBenchmarks.kt | 30 +- .../resources/SimAbstractResourceAggregator.kt | 54 +-- .../resources/SimAbstractResourceContext.kt | 362 ------------------- .../resources/SimAbstractResourceProvider.kt | 98 +++++ .../simulator/resources/SimResourceAggregator.kt | 2 +- .../resources/SimResourceAggregatorMaxMin.kt | 5 +- .../resources/SimResourceControllableContext.kt | 64 ++++ .../simulator/resources/SimResourceDistributor.kt | 2 +- .../resources/SimResourceDistributorMaxMin.kt | 222 ++++++------ .../simulator/resources/SimResourceFlushable.kt | 37 -- .../simulator/resources/SimResourceInterpreter.kt | 99 +++++ .../resources/SimResourceProviderLogic.kt | 73 ++++ .../simulator/resources/SimResourceScheduler.kt | 69 ---- .../resources/SimResourceSchedulerTrampoline.kt | 95 ----- .../simulator/resources/SimResourceSource.kt | 84 +---- .../resources/SimResourceSwitchExclusive.kt | 1 - .../simulator/resources/SimResourceSwitchMaxMin.kt | 11 +- .../simulator/resources/SimResourceSystem.kt | 43 +++ .../resources/impl/SimResourceContextImpl.kt | 397 +++++++++++++++++++++ .../resources/impl/SimResourceInterpreterImpl.kt | 300 ++++++++++++++++ .../resources/SimResourceAggregatorMaxMinTest.kt | 13 +- .../simulator/resources/SimResourceContextTest.kt | 93 ++--- .../simulator/resources/SimResourceSourceTest.kt | 27 +- .../resources/SimResourceSwitchExclusiveTest.kt | 9 +- .../resources/SimResourceSwitchMaxMinTest.kt | 15 +- .../resources/SimResourceTransformerTest.kt | 15 +- .../simulator/resources/SimWorkConsumerTest.kt | 5 +- 39 files changed, 1482 insertions(+), 993 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt delete mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt delete mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt delete mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 15714aca..bc21edc6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -34,8 +34,7 @@ import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceScheduler -import org.opendc.simulator.resources.SimResourceSchedulerTrampoline +import org.opendc.simulator.resources.SimResourceInterpreter import org.openjdk.jmh.annotations.* import java.util.concurrent.TimeUnit @@ -46,13 +45,13 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: SimResourceScheduler + private lateinit var interpreter: SimResourceInterpreter private lateinit var machineModel: SimMachineModel @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) + interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -85,7 +84,7 @@ class SimMachineBenchmarks { fun benchmarkBareMetal(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace)) @@ -96,10 +95,10 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } @@ -118,10 +117,10 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(scheduler) + val hypervisor = SimFairShareHypervisor(interpreter) launch { machine.run(hypervisor) } @@ -140,10 +139,10 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(scheduler) + val hypervisor = SimFairShareHypervisor(interpreter) launch { machine.run(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 713376e7..2df307d3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -31,12 +31,13 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* +import org.opendc.simulator.resources.SimResourceSwitch import java.time.Clock /** * Abstract implementation of the [SimHypervisor] interface. */ -public abstract class SimAbstractHypervisor : SimHypervisor { +public abstract class SimAbstractHypervisor(private val interpreter: SimResourceInterpreter) : SimHypervisor { /** * The machine on which the hypervisor runs. */ @@ -122,11 +123,13 @@ public abstract class SimAbstractHypervisor : SimHypervisor { override val meta: Map = meta } - workload.onStart(ctx) + interpreter.batch { + workload.onStart(ctx) - for (cpu in cpus) { - launch { - cpu.consume(workload.getConsumer(ctx, cpu.model)) + for (cpu in cpus) { + launch { + cpu.consume(workload.getConsumer(ctx, cpu.model)) + } } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index e501033a..de2b3eef 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -27,15 +27,18 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.resources.batch import org.opendc.simulator.resources.consume -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import java.time.Clock -import kotlin.coroutines.CoroutineContext /** * Abstract implementation of the [SimMachine] interface. + * + * @param interpreter The interpreter to manage the machine's resources. */ -public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine { +public abstract class SimAbstractMachine(protected val interpreter: SimResourceInterpreter) : SimMachine, SimResourceSystem { private val _usage = MutableStateFlow(0.0) override val usage: StateFlow get() = _usage @@ -52,11 +55,6 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine */ private var isTerminated = false - /** - * The [CoroutineContext] to run in. - */ - protected abstract val context: CoroutineContext - /** * The resources allocated for this machine. */ @@ -67,7 +65,7 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine */ private inner class Context(override val meta: Map) : SimMachineContext { override val clock: Clock - get() = this@SimAbstractMachine.clock + get() = interpreter.clock override val cpus: List = this@SimAbstractMachine.cpus @@ -77,38 +75,35 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload, meta: Map): Unit = withContext(context) { - require(!isTerminated) { "Machine is terminated" } + override suspend fun run(workload: SimWorkload, meta: Map): Unit = coroutineScope { + check(!isTerminated) { "Machine is terminated" } val ctx = Context(meta) - val totalCapacity = model.cpus.sumOf { it.frequency } - - _speed = DoubleArray(model.cpus.size) { 0.0 } - var totalSpeed = 0.0 // Before the workload starts, initialize the initial power draw + _speed = DoubleArray(model.cpus.size) { 0.0 } updateUsage(0.0) - workload.onStart(ctx) + interpreter.batch { + workload.onStart(ctx) - for (cpu in cpus) { - val model = cpu.model - val consumer = workload.getConsumer(ctx, model) - val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed -> - val _speed = _speed - val _usage = _usage - - val oldSpeed = _speed[model.id] - _speed[model.id] = newSpeed - totalSpeed = totalSpeed - oldSpeed + newSpeed - - val newUsage = totalSpeed / totalCapacity - if (_usage.value != newUsage) { - updateUsage(totalSpeed / totalCapacity) - } + for (cpu in cpus) { + val model = cpu.model + val consumer = workload.getConsumer(ctx, model) + launch { cpu.consume(consumer) } } + } + } - launch { cpu.consume(adapter) } + override fun onConverge(timestamp: Long) { + val totalCapacity = model.cpus.sumOf { it.frequency } + val cpus = cpus + var totalSpeed = 0.0 + for (cpu in cpus) { + _speed[cpu.model.id] = cpu.speed + totalSpeed += cpu.speed } + + updateUsage(totalSpeed / totalCapacity) } /** @@ -119,9 +114,11 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine } override fun close() { - if (!isTerminated) { - isTerminated = true - cpus.forEach(SimProcessingUnit::close) + if (isTerminated) { + return } + + isTerminated = true + cpus.forEach(SimProcessingUnit::close) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 27ebba21..f5218ba9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -27,8 +27,7 @@ import org.opendc.simulator.compute.cpufreq.ScalingDriver import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* -import org.opendc.utils.TimerScheduler -import java.time.Clock +import org.opendc.simulator.resources.SimResourceInterpreter import kotlin.coroutines.* /** @@ -43,24 +42,11 @@ import kotlin.coroutines.* */ @OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( - context: CoroutineContext, - private val clock: Clock, + platform: SimResourceInterpreter, override val model: SimMachineModel, scalingGovernor: ScalingGovernor, scalingDriver: ScalingDriver -) : SimAbstractMachine(clock) { - /** - * The [Job] associated with this machine. - */ - private val scope = CoroutineScope(context + Job()) - - override val context: CoroutineContext = scope.coroutineContext - - /** - * The [TimerScheduler] to use for scheduling the interrupts. - */ - private val scheduler = SimResourceSchedulerTrampoline(this.context, clock) - +) : SimAbstractMachine(platform) { override val cpus: List = model.cpus.map { ProcessingUnitImpl(it) } /** @@ -75,6 +61,8 @@ public class SimBareMetalMachine( scalingGovernor.createLogic(this.scalingDriver.createContext(cpu)) } + override val parent: SimResourceSystem? = null + init { scalingGovernors.forEach { it.onStart() } } @@ -92,12 +80,6 @@ public class SimBareMetalMachine( powerDraw = scalingDriver.computePower() } - override fun close() { - super.close() - - scope.cancel() - } - /** * The [SimProcessingUnit] of this machine. */ @@ -105,7 +87,7 @@ public class SimBareMetalMachine( /** * The actual resource supporting the processing unit. */ - private val source = SimResourceSource(model.frequency, scheduler) + private val source = SimResourceSource(model.frequency, interpreter, this@SimBareMetalMachine) override val speed: Double get() = source.speed @@ -126,7 +108,7 @@ public class SimBareMetalMachine( } override fun close() { - source.interrupt() + source.close() } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 11aec2de..33e7e637 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -23,7 +23,9 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.* +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSwitch +import org.opendc.simulator.resources.SimResourceSwitchMaxMin /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single @@ -31,13 +33,13 @@ import org.opendc.simulator.resources.* * * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { +public class SimFairShareHypervisor(private val interpreter: SimResourceInterpreter, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor(interpreter) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchMaxMin( - scheduler, + interpreter, null, object : SimResourceSwitchMaxMin.Listener { override fun onSliceFinish( switch: SimResourceSwitchMaxMin, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt index 2ab3ea09..68858cc1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt @@ -22,9 +22,7 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceSchedulerTrampoline -import java.time.Clock -import kotlin.coroutines.CoroutineContext +import org.opendc.simulator.resources.SimResourceInterpreter /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -32,7 +30,7 @@ import kotlin.coroutines.CoroutineContext public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" - override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { - return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener) + override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor { + return SimFairShareHypervisor(interpreter, listener) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt index b66020f4..d0753084 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt @@ -22,8 +22,7 @@ package org.opendc.simulator.compute -import java.time.Clock -import kotlin.coroutines.CoroutineContext +import org.opendc.simulator.resources.SimResourceInterpreter /** * A service provider interface for constructing a [SimHypervisor]. @@ -40,5 +39,5 @@ public interface SimHypervisorProvider { /** * Create a [SimHypervisor] instance with the specified [listener]. */ - public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor + public fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener? = null): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index fd8e546f..afb47872 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -22,12 +22,14 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.* +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSwitch +import org.opendc.simulator.resources.SimResourceSwitchExclusive /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { +public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index 83b924d7..c017c8ab 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -22,8 +22,7 @@ package org.opendc.simulator.compute -import java.time.Clock -import kotlin.coroutines.CoroutineContext +import org.opendc.simulator.resources.SimResourceInterpreter /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -31,7 +30,7 @@ import kotlin.coroutines.CoroutineContext public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { - return SimSpaceSharedHypervisor() + override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor { + return SimSpaceSharedHypervisor(interpreter) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 8886caa7..c1b5089c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit @@ -39,7 +40,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceSchedulerTrampoline +import org.opendc.simulator.resources.SimResourceInterpreter /** * Test suite for the [SimHypervisor] class. @@ -93,8 +94,9 @@ internal class SimHypervisorTest { ), ) - val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) + val platform = SimResourceInterpreter(coroutineContext, clock) + val machine = SimBareMetalMachine(platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val hypervisor = SimFairShareHypervisor(platform, listener) launch { machine.run(hypervisor) @@ -164,11 +166,12 @@ internal class SimHypervisorTest { ) ) + val platform = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, model, PerformanceScalingGovernor(), + platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) + val hypervisor = SimFairShareHypervisor(platform, listener) launch { machine.run(hypervisor) @@ -190,10 +193,34 @@ internal class SimHypervisorTest { yield() assertAll( - { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, { assertEquals(1200000, clock.millis()) } ) } + + @Test + fun testMultipleCPUs() = runBlockingSimulation { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val model = SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + + val platform = SimResourceInterpreter(coroutineContext, clock) + val machine = SimBareMetalMachine( + platform, model, PerformanceScalingGovernor(), + SimpleScalingDriver(ConstantPowerModel(0.0)) + ) + val hypervisor = SimFairShareHypervisor(platform) + + assertDoesNotThrow { + launch { + machine.run(hypervisor) + } + } + + machine.close() + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 205f2eca..7cc3c6dd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -37,6 +37,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter /** * Test suite for the [SimBareMetalMachine] class. @@ -57,7 +58,7 @@ class SimMachineTest { @Test fun testFlopsWorkload() = runBlockingSimulation { - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) @@ -76,7 +77,7 @@ class SimMachineTest { cpus = List(cpuNode.coreCount * 2) { ProcessingUnit(cpuNode, it % 2, 1000.0) }, memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) @@ -90,7 +91,7 @@ class SimMachineTest { @Test fun testUsage() = runBlockingSimulation { - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) val res = mutableListOf() val job = launch { machine.usage.toList(res) } @@ -99,7 +100,7 @@ class SimMachineTest { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) yield() job.cancel() - assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } + assertEquals(listOf(0.0, 1.0, 0.0), res) { "Machine is fully utilized" } } finally { machine.close() } @@ -107,7 +108,7 @@ class SimMachineTest { @Test fun testClose() = runBlockingSimulation { - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) machine.close() assertDoesNotThrow { machine.close() } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index ef6f536d..dd824557 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -40,6 +40,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter /** * A test suite for the [SimSpaceSharedHypervisor]. @@ -76,11 +77,12 @@ internal class SimSpaceSharedHypervisorTest { ), ) + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) val colA = launch { machine.usage.toList(usagePm) } launch { machine.run(hypervisor) } @@ -112,11 +114,12 @@ internal class SimSpaceSharedHypervisorTest { fun testRuntimeWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -135,11 +138,12 @@ internal class SimSpaceSharedHypervisorTest { fun testFlopsWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -156,11 +160,12 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -182,11 +187,12 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -206,11 +212,12 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), + interpreter, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index cd5f33bd..9233c72d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimResourceBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: SimResourceScheduler + private lateinit var interpreter: SimResourceInterpreter @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) + interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) } @State(Scope.Thread) @@ -67,7 +67,7 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSource(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, scheduler) + val provider = SimResourceSource(4200.0, interpreter) return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -75,7 +75,7 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, scheduler) + val provider = SimResourceSource(4200.0, interpreter) val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace)) @@ -85,10 +85,10 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(scheduler) + val switch = SimResourceSwitchMaxMin(interpreter) - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) val provider = switch.addOutput(3500.0) return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) @@ -98,12 +98,12 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(scheduler) + val switch = SimResourceSwitchMaxMin(interpreter) - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) - repeat(3) { i -> + repeat(3) { launch { val provider = switch.addOutput(3500.0) provider.consume(SimTraceConsumer(state.trace)) @@ -117,8 +117,8 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) val provider = switch.addOutput(3500.0) return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) @@ -130,8 +130,8 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) repeat(2) { launch { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 653b53e0..be04d399 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -25,7 +25,10 @@ package org.opendc.simulator.resources /** * Abstract implementation of [SimResourceAggregator]. */ -public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator { +public abstract class SimAbstractResourceAggregator( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? +) : SimResourceAggregator { /** * This method is invoked when the resource consumer consumes resources. */ @@ -75,29 +78,29 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe protected val outputContext: SimResourceContext get() = context - private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) { - override val remainingWork: Double - get() { - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - _inputConsumers.sumOf { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it } - } else { - _remainingWork - } + private val context = interpreter.newContext( + _output, + object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + doIdle(deadline) + return Long.MAX_VALUE } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + doConsume(work, limit, deadline) + return Long.MAX_VALUE + } - override fun onIdle(deadline: Long) = doIdle(deadline) + override fun onFinish(ctx: SimResourceControllableContext) { + doFinish(null) + } - override fun onFinish() { - doFinish(null) - } - } + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return _inputConsumers.sumOf { it.remainingWork } + } + }, + parent + ) /** * An input for the resource aggregator. @@ -123,7 +126,13 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe */ override val ctx: SimResourceContext get() = _ctx!! - var _ctx: SimResourceContext? = null + private var _ctx: SimResourceContext? = null + + /** + * The remaining work of the consumer. + */ + val remainingWork: Double + get() = _ctx?.remainingWork ?: 0.0 /** * The resource command to run next. @@ -149,7 +158,8 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe this.command = null next } else { - context.flush(isIntermediate = true) + context.flush() + next = command this.command = null next ?: SimResourceCommand.Idle() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt deleted file mode 100644 index c03bfad5..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock -import kotlin.math.max -import kotlin.math.min - -/** - * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. - */ -public abstract class SimAbstractResourceContext( - initialCapacity: Double, - private val scheduler: SimResourceScheduler, - private val consumer: SimResourceConsumer -) : SimResourceContext, SimResourceFlushable { - - /** - * The clock of the context. - */ - public override val clock: Clock - get() = scheduler.clock - - /** - * The capacity of the resource. - */ - public final override var capacity: Double = initialCapacity - set(value) { - val oldValue = field - - // Only changes will be propagated - if (value != oldValue) { - field = value - onCapacityChange() - } - } - - /** - * The amount of work still remaining at this instant. - */ - override val remainingWork: Double - get() { - val activeCommand = activeCommand ?: return 0.0 - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(activeCommand, now).also { _remainingWork = it } - } else { - _remainingWork - } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - - /** - * A flag to indicate the state of the context. - */ - public var state: SimResourceState = SimResourceState.Pending - private set - - /** - * The current processing speed of the resource. - */ - final override var speed: Double = 0.0 - private set - - /** - * This method is invoked when the resource will idle until the specified [deadline]. - */ - public abstract fun onIdle(deadline: Long) - - /** - * This method is invoked when the resource will be consumed until the specified [work] was processed or the - * [deadline] was reached. - */ - public abstract fun onConsume(work: Double, limit: Double, deadline: Long) - - /** - * This method is invoked when the resource consumer has finished. - */ - public abstract fun onFinish() - - /** - * Get the remaining work to process after a resource consumption. - * - * @param work The size of the resource consumption. - * @param speed The speed of consumption. - * @param duration The duration from the start of the consumption until now. - * @return The amount of work remaining. - */ - protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - return if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, work - processed) - } else { - 0.0 - } - } - - /** - * Start the consumer. - */ - public fun start() { - check(state == SimResourceState.Pending) { "Consumer is already started" } - - val now = clock.millis() - - state = SimResourceState.Active - isProcessing = true - latestFlush = now - - try { - consumer.onEvent(this, SimResourceEvent.Start) - activeCommand = interpret(consumer.onNext(this), now) - } catch (cause: Throwable) { - doFail(cause) - } finally { - isProcessing = false - } - } - - /** - * Immediately stop the consumer. - */ - public fun stop() { - try { - isProcessing = true - latestFlush = clock.millis() - - flush(isIntermediate = true) - doStop() - } finally { - isProcessing = false - } - } - - override fun flush(isIntermediate: Boolean) { - // Flush is no-op when the consumer is finished or not yet started - if (state != SimResourceState.Active) { - return - } - - val now = clock.millis() - - // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. - if (isIntermediate && latestFlush >= now) { - return - } - - try { - val activeCommand = activeCommand ?: return - val (timestamp, command) = activeCommand - - // Note: accessor is reliant on activeCommand being set - val remainingWork = remainingWork - - isProcessing = true - - val duration = now - timestamp - assert(duration >= 0) { "Flush in the past" } - - this.activeCommand = when (command) { - is SimResourceCommand.Idle -> { - // We should only continue processing the next command if: - // 1. The resource consumer reached its deadline. - // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Idle(command.deadline), now) - } - } - is SimResourceCommand.Consume -> { - // We should only continue processing the next command if: - // 1. The resource consumption was finished. - // 2. The resource capacity cannot satisfy the demand. - // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) - } - } - SimResourceCommand.Exit -> - // Flush may not be called when the resource consumer has finished - throw IllegalStateException() - } - - // Flush remaining work cache - _remainingWorkFlush = Long.MIN_VALUE - } catch (cause: Throwable) { - doFail(cause) - } finally { - latestFlush = now - isProcessing = false - } - } - - override fun interrupt() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isProcessing) { - return - } - - scheduler.schedule(this, isIntermediate = false) - } - - override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" - - /** - * A flag to indicate that the resource is currently processing a command. - */ - private var isProcessing: Boolean = false - - /** - * The current command that is being processed. - */ - private var activeCommand: CommandWrapper? = null - - /** - * The latest timestamp at which the resource was flushed. - */ - private var latestFlush: Long = Long.MIN_VALUE - - /** - * Finish the consumer and resource provider. - */ - private fun doStop() { - val state = state - this.state = SimResourceState.Stopped - - if (state == SimResourceState.Active) { - activeCommand = null - try { - consumer.onEvent(this, SimResourceEvent.Exit) - onFinish() - } catch (cause: Throwable) { - doFail(cause) - } - } - } - - /** - * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. - */ - private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { - when (command) { - is SimResourceCommand.Idle -> { - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = 0.0 - - onIdle(deadline) - consumer.onEvent(this, SimResourceEvent.Run) - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = min(capacity, limit) - onConsume(work, limit, deadline) - consumer.onEvent(this, SimResourceEvent.Run) - } - is SimResourceCommand.Exit -> { - speed = 0.0 - - doStop() - - // No need to set the next active command - return null - } - } - - return CommandWrapper(now, command) - } - - /** - * Request the workload for more work. - */ - private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) - - /** - * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. - */ - private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { - val (timestamp, command) = wrapper - val duration = now - timestamp - return when (command) { - is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) - is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 - } - } - - /** - * Fail the resource consumer. - */ - private fun doFail(cause: Throwable) { - state = SimResourceState.Stopped - activeCommand = null - - try { - consumer.onFailure(this, cause) - } catch (e: Throwable) { - e.addSuppressed(cause) - e.printStackTrace() - } - - onFinish() - } - - /** - * Indicate that the capacity of the resource has changed. - */ - private fun onCapacityChange() { - // Do not inform the consumer if it has not been started yet - if (state != SimResourceState.Active) { - return - } - - val isThrottled = speed > capacity - - consumer.onEvent(this, SimResourceEvent.Capacity) - - // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. - // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). - if (isThrottled) { - flush(isIntermediate = true) - } - } - - /** - * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. - */ - private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt new file mode 100644 index 00000000..519c2615 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations. + */ +public abstract class SimAbstractResourceProvider( + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null, + initialCapacity: Double +) : SimResourceProvider { + /** + * The capacity of the resource. + */ + public open var capacity: Double = initialCapacity + protected set(value) { + field = value + ctx?.capacity = value + } + + /** + * The [SimResourceControllableContext] that is currently running. + */ + protected var ctx: SimResourceControllableContext? = null + + /** + * The state of the resource provider. + */ + final override var state: SimResourceState = SimResourceState.Pending + private set + + /** + * Construct the [SimResourceProviderLogic] instance for a new consumer. + */ + protected abstract fun createLogic(): SimResourceProviderLogic + + /** + * Start the specified [SimResourceControllableContext]. + */ + protected open fun start(ctx: SimResourceControllableContext) { + ctx.start() + } + + final override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + val ctx = interpreter.newContext(consumer, createLogic(), parent) + + ctx.capacity = capacity + this.ctx = ctx + this.state = SimResourceState.Active + + start(ctx) + } + + override fun close() { + cancel() + state = SimResourceState.Stopped + } + + final override fun interrupt() { + ctx?.interrupt() + } + + final override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.close() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + + override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]" +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt index bb4e6a2c..5c0346cd 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -37,7 +37,7 @@ public interface SimResourceAggregator : AutoCloseable { public val inputs: Set /** - * Add the specified [input] to the switch. + * Add the specified [input] to the aggregator. */ public fun addInput(input: SimResourceProvider) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 5665abd1..bdab6def 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -25,7 +25,10 @@ package org.opendc.simulator.resources /** * A [SimResourceAggregator] that distributes the load equally across the input resources. */ -public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) { +public class SimResourceAggregatorMaxMin( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null +) : SimAbstractResourceAggregator(interpreter, parent) { private val consumers = mutableListOf() override fun doConsume(work: Double, limit: Double, deadline: Long) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt new file mode 100644 index 00000000..ceaca39a --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A controllable [SimResourceContext]. + * + * This interface is used by resource providers to control the resource context. + */ +public interface SimResourceControllableContext : SimResourceContext, AutoCloseable { + /** + * The state of the resource context. + */ + public val state: SimResourceState + + /** + * The capacity of the resource. + */ + public override var capacity: Double + + /** + * Start the resource context. + */ + public fun start() + + /** + * Stop the resource context. + */ + public override fun close() + + /** + * Invalidate the resource context's state. + * + * By invalidating the resource context's current state, the state is re-computed and the current progress is + * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the + * synchronous variant. + */ + public fun invalidate() + + /** + * Synchronously flush the progress of the resource context and materialize its current progress. + */ + public fun flush() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index b2759b7f..8dd1bd2b 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -37,7 +37,7 @@ public interface SimResourceDistributor : AutoCloseable { public val input: SimResourceProvider /** - * Add an output to the switch with the specified [capacity]. + * Create a new output for the distributor with the specified [capacity]. */ public fun addOutput(capacity: Double): SimResourceProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index a76cb1e3..e99f5eff 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -30,17 +30,18 @@ import kotlin.math.min */ public class SimResourceDistributorMaxMin( override val input: SimResourceProvider, - private val scheduler: SimResourceScheduler, + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null, private val listener: Listener? = null ) : SimResourceDistributor { override val outputs: Set get() = _outputs - private val _outputs = mutableSetOf() + private val _outputs = mutableSetOf() /** - * The active output contexts. + * The active outputs. */ - private val outputContexts: MutableList = mutableListOf() + private val activeOutputs: MutableList = mutableListOf() /** * The total speed requested by the output resources. @@ -72,6 +73,11 @@ public class SimResourceDistributorMaxMin( */ private var totalInterferedWork = 0.0 + /** + * The timestamp of the last report. + */ + private var lastReport: Long = Long.MIN_VALUE + /** * A flag to indicate that the switch is closed. */ @@ -121,10 +127,14 @@ public class SimResourceDistributorMaxMin( private val totalRemainingWork: Double get() = consumer.remainingWork + init { + input.startConsumer(consumer) + } + override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Distributor has been closed" } - val provider = OutputProvider(capacity) + val provider = Output(capacity) _outputs.add(provider) return provider } @@ -136,23 +146,12 @@ public class SimResourceDistributorMaxMin( } } - init { - input.startConsumer(consumer) - } - - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - input.interrupt() - } - /** * Schedule the work over the physical CPUs. */ private fun doSchedule(capacity: Double): SimResourceCommand { // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { + if (activeOutputs.isEmpty()) { return SimResourceCommand.Idle() } @@ -164,25 +163,25 @@ public class SimResourceDistributorMaxMin( var totalRequestedWork = 0.0 // Flush the work of the outputs - var outputIterator = outputContexts.listIterator() + var outputIterator = activeOutputs.listIterator() while (outputIterator.hasNext()) { val output = outputIterator.next() - output.flush(isIntermediate = true) + output.pull() - if (output.activeCommand == SimResourceCommand.Exit) { - // Apparently the output consumer has exited, so remove it from the scheduling queue. + if (output.isFinished) { + // The output consumer has exited, so remove it from the scheduling queue. outputIterator.remove() } } // Sort the outputs based on their requested usage // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() + activeOutputs.sort() // Divide the available input capacity fairly across the outputs using max-min fair sharing - outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size + outputIterator = activeOutputs.listIterator() + var remaining = activeOutputs.size while (outputIterator.hasNext()) { val output = outputIterator.next() val availableShare = availableSpeed / remaining-- @@ -219,12 +218,12 @@ public class SimResourceDistributorMaxMin( } } - assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" } + assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork this.totalAllocatedSpeed = maxUsage - availableSpeed - this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)) return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) @@ -237,24 +236,28 @@ public class SimResourceDistributorMaxMin( */ private fun doNext(capacity: Double): SimResourceCommand { val totalRequestedWork = totalRequestedWork.toLong() - val totalRemainingWork = totalRemainingWork.toLong() val totalAllocatedWork = totalAllocatedWork.toLong() + val totalRemainingWork = totalRemainingWork.toLong() val totalRequestedSpeed = totalRequestedSpeed val totalAllocatedSpeed = totalAllocatedSpeed // Force all inputs to re-schedule their work. val command = doSchedule(capacity) - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork, - totalAllocatedWork - totalRemainingWork, - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalAllocatedSpeed, - totalRequestedSpeed - ) + val now = interpreter.clock.millis() + if (lastReport < now) { + // Report metrics + listener?.onSliceFinish( + this, + totalRequestedWork, + totalAllocatedWork - totalRemainingWork, + totalOvercommittedWork.toLong(), + totalInterferedWork.toLong(), + totalAllocatedSpeed, + totalRequestedSpeed + ) + lastReport = now + } totalInterferedWork = 0.0 totalOvercommittedWork = 0.0 @@ -283,102 +286,85 @@ public class SimResourceDistributorMaxMin( /** * An internal [SimResourceProvider] implementation for switch outputs. */ - private inner class OutputProvider(val capacity: Double) : SimResourceProvider { + private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable { /** - * The [OutputContext] that is currently running. + * The current command that is processed by the resource. */ - private var ctx: OutputContext? = null + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - override var state: SimResourceState = SimResourceState.Pending - internal set + /** + * The processing speed that is allowed by the model constraints. + */ + var allowedSpeed: Double = 0.0 + + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + /** + * A flag to indicate that the output is finished. + */ + val isFinished + get() = activeCommand is SimResourceCommand.Exit - val ctx = OutputContext(this, consumer) - this.ctx = ctx - this.state = SimResourceState.Active - outputContexts += ctx + /** + * The timestamp at which we received the last command. + */ + private var lastCommandTimestamp: Long = Long.MIN_VALUE - ctx.start() - schedule() - } + /* SimAbstractResourceProvider */ + override fun createLogic(): SimResourceProviderLogic = this - override fun close() { - cancel() + override fun start(ctx: SimResourceControllableContext) { + activeOutputs += this - if (state != SimResourceState.Stopped) { - state = SimResourceState.Stopped - _outputs.remove(this) + interpreter.batch { + ctx.start() + // Interrupt the input to re-schedule the resources + input.interrupt() } } - override fun interrupt() { - ctx?.interrupt() - } + override fun close() { + val state = state - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } + super.close() if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending + _outputs.remove(this) } } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - private val provider: OutputProvider, - consumer: SimResourceConsumer - ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - private fun reportOvercommit() { - val remainingWork = remainingWork - totalOvercommittedWork += remainingWork - } - override fun onIdle(deadline: Long) { - reportOvercommit() + /* SimResourceProviderLogic */ + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + reportOvercommit(ctx.remainingWork) allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) + lastCommandTimestamp = ctx.clock.millis() + + return Long.MAX_VALUE } - override fun onConsume(work: Double, limit: Double, deadline: Long) { - reportOvercommit() + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + reportOvercommit(ctx.remainingWork) - allowedSpeed = speed + allowedSpeed = ctx.speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) + lastCommandTimestamp = ctx.clock.millis() + + return Long.MAX_VALUE } - override fun onFinish() { - reportOvercommit() + override fun onFinish(ctx: SimResourceControllableContext) { + reportOvercommit(ctx.remainingWork) activeCommand = SimResourceCommand.Exit - provider.cancel() + lastCommandTimestamp = ctx.clock.millis() } - override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { // Apply performance interference model val performanceScore = 1.0 @@ -401,27 +387,21 @@ public class SimResourceDistributorMaxMin( } } - private var isProcessing: Boolean = false + /* Comparable */ + override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed) - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - try { - isProcessing = false - - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } finally { - isProcessing = true + /** + * Pull the next command if necessary. + */ + fun pull() { + val ctx = ctx + if (ctx != null && lastCommandTimestamp < ctx.clock.millis()) { + ctx.flush() } } - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) + private fun reportOvercommit(remainingWork: Double) { + totalOvercommittedWork += remainingWork + } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt deleted file mode 100644 index f6a1a42e..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer. - */ -public interface SimResourceFlushable { - /** - * Flush the current active resource consumption. - * - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun flush(isIntermediate: Boolean) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt new file mode 100644 index 00000000..82631377 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The resource interpreter is responsible for managing the interaction between resource consumer and provider. + * + * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public interface SimResourceInterpreter { + /** + * The [Clock] associated with this interpreter. + */ + public val clock: Clock + + /** + * Create a new [SimResourceControllableContext] with the given [provider]. + * + * @param consumer The consumer logic. + * @param provider The logic of the resource provider. + * @param parent The system to which the resource context belongs. + */ + public fun newContext( + consumer: SimResourceConsumer, + provider: SimResourceProviderLogic, + parent: SimResourceSystem? = null + ): SimResourceControllableContext + + /** + * Start batching the execution of resource updates until [popBatch] is called. + * + * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs + * simultaneously) in a single state update. + * + * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the + * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called + * the same amount of times. To simplify batching, see [batch]. + */ + public fun pushBatch() + + /** + * Stop the batching of resource updates and run the interpreter on the batch. + * + * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call. + */ + public fun popBatch() + + public companion object { + /** + * Construct a new [SimResourceInterpreter] implementation. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ + @JvmName("create") + public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter { + return SimResourceInterpreterImpl(context, clock) + } + } +} + +/** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ +public inline fun SimResourceInterpreter.batch(block: () -> Unit) { + try { + pushBatch() + block() + } finally { + popBatch() + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt new file mode 100644 index 00000000..22676984 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlin.math.max + +/** + * The logic of a resource provider. + */ +public interface SimResourceProviderLogic { + /** + * This method is invoked when the resource will idle until the specified [deadline]. + * + * @param ctx The context in which the provider runs. + * @param deadline The deadline that was requested by the resource consumer. + * @return The instant at which to resume the consumer. + */ + public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long + + /** + * This method is invoked when the resource will be consumed until the specified [work] was processed or the + * [deadline] was reached. + * + * @param ctx The context in which the provider runs. + * @param work The amount of work that was requested by the resource consumer. + * @param limit The limit on the work rate of the resource consumer. + * @param deadline The deadline that was requested by the resource consumer. + * @return The instant at which to resume the consumer. + */ + public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long + + /** + * This method is invoked when the resource consumer has finished. + */ + public fun onFinish(ctx: SimResourceControllableContext) + + /** + * Get the remaining work to process after a resource consumption. + * + * @param work The size of the resource consumption. + * @param speed The speed of consumption. + * @param duration The duration from the start of the consumption until now. + * @return The amount of work remaining. + */ + public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return if (duration > 0L) { + val processed = duration / 1000.0 * speed + max(0.0, work - processed) + } else { + 0.0 + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt deleted file mode 100644 index a228c47b..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock - -/** - * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource - * providers and consumers. - * - * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more - * efficiently, reducing the overall work needed per update. - */ -public interface SimResourceScheduler { - /** - * The [Clock] associated with this scheduler. - */ - public val clock: Clock - - /** - * Schedule a direct interrupt for the resource context represented by [flushable]. - * - * @param flushable The resource context that needs to be flushed. - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false) - - /** - * Schedule an interrupt in the future for the resource context represented by [flushable]. - * - * This method will override earlier calls to this method for the same [flushable]. - * - * @param flushable The resource context that needs to be flushed. - * @param timestamp The timestamp when the interrupt should happen. - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false) - - /** - * Batch the execution of several interrupts into a single call. - * - * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. - */ - public fun batch(block: () -> Unit) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt deleted file mode 100644 index cdbb4a6c..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.utils.TimerScheduler -import java.time.Clock -import java.util.ArrayDeque -import kotlin.coroutines.CoroutineContext - -/** - * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after. - * - * @param clock The virtual simulation clock. - */ -public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler { - /** - * The [TimerScheduler] to actually schedule the interrupts. - */ - private val timers = TimerScheduler(context, clock) - - /** - * A flag to indicate that an interrupt is currently running already. - */ - private var isRunning: Boolean = false - - /** - * The queue of resources to be flushed. - */ - private val queue = ArrayDeque>() - - override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) { - queue.add(flushable to isIntermediate) - - if (isRunning) { - return - } - - flush() - } - - override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) { - timers.startSingleTimerTo(flushable, timestamp) { - schedule(flushable, isIntermediate) - } - } - - override fun batch(block: () -> Unit) { - val wasAlreadyRunning = isRunning - try { - isRunning = true - block() - } finally { - if (!wasAlreadyRunning) { - isRunning = false - } - } - } - - /** - * Flush the scheduled queue. - */ - private fun flush() { - val visited = mutableSetOf() - try { - isRunning = true - while (queue.isNotEmpty()) { - val (flushable, isIntermediate) = queue.poll() - flushable.flush(isIntermediate) - visited.add(flushable) - } - } finally { - isRunning = false - } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3277b889..d984d2a5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,15 +26,17 @@ import kotlin.math.ceil import kotlin.math.min /** - * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. + * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity. * * @param initialCapacity The initial capacity of the resource. - * @param scheduler The scheduler to schedule the interrupts. + * @param interpreter The interpreter that is used for managing the resource contexts. + * @param parent The parent resource system. */ public class SimResourceSource( initialCapacity: Double, - private val scheduler: SimResourceScheduler -) : SimResourceProvider { + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null +) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { /** * The current processing speed of the resource. */ @@ -44,80 +46,30 @@ public class SimResourceSource( /** * The capacity of the resource. */ - public var capacity: Double = initialCapacity + public override var capacity: Double = initialCapacity set(value) { field = value ctx?.capacity = value } - /** - * The [Context] that is currently running. - */ - private var ctx: Context? = null - - override var state: SimResourceState = SimResourceState.Pending - private set - - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } - val ctx = Context(consumer) - - this.ctx = ctx - this.state = SimResourceState.Active - - ctx.start() - } - - override fun close() { - cancel() - state = SimResourceState.Stopped - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - - /** - * Internal implementation of [SimResourceContext] for this class. - */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) { - override fun onIdle(deadline: Long) { - // Do not resume if deadline is "infinite" - if (deadline != Long.MAX_VALUE) { - scheduler.schedule(this, deadline) + override fun createLogic(): SimResourceProviderLogic { + return object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + return deadline } - } - override fun onConsume(work: Double, limit: Double, deadline: Long) { - val until = min(deadline, clock.millis() + getDuration(work, speed)) - scheduler.schedule(this, until) - } - - override fun onFinish() { - cancel() - - ctx = null + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + return min(deadline, ctx.clock.millis() + getDuration(work, speed)) + } - if (this@SimResourceSource.state != SimResourceState.Stopped) { - this@SimResourceSource.state = SimResourceState.Pending + override fun onFinish(ctx: SimResourceControllableContext) { + cancel() } } - - override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + override fun toString(): String = "SimResourceSource[capacity=$capacity]" + /** * Compute the duration that a resource consumption will take with the specified [speed]. */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 1a9dd0bc..7f1bb2b7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -90,7 +90,6 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { ) : SimResourceProvider by forwarder { override fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. - _outputs -= this availableResources += forwarder } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 5dc1e68d..61887e34 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -22,14 +22,13 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.* - /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ public class SimResourceSwitchMaxMin( - scheduler: SimResourceScheduler, + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null, private val listener: Listener? = null ) : SimResourceSwitch { private val _outputs = mutableSetOf() @@ -48,13 +47,13 @@ public class SimResourceSwitchMaxMin( /** * The aggregator to aggregate the resources. */ - private val aggregator = SimResourceAggregatorMaxMin(scheduler) + private val aggregator = SimResourceAggregatorMaxMin(interpreter, parent) /** * The distributor to distribute the aggregated resources. */ private val distributor = SimResourceDistributorMaxMin( - aggregator.output, scheduler, + aggregator.output, interpreter, parent, object : SimResourceDistributorMaxMin.Listener { override fun onSliceFinish( switch: SimResourceDistributor, @@ -71,7 +70,7 @@ public class SimResourceSwitchMaxMin( ) /** - * Add an output to the switch represented by [resource]. + * Add an output to the switch with the specified [capacity]. */ override fun addOutput(capacity: Double): SimResourceProvider { check(!isClosed) { "Switch has been closed" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt new file mode 100644 index 00000000..609262cb --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A system of possible multiple sub-resources. + * + * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the + * resource provider. + */ +public interface SimResourceSystem { + /** + * The parent system to which this system belongs or `null` if it has no parent. + */ + public val parent: SimResourceSystem? + + /** + * This method is invoked when the system has converged to a steady-state. + * + * @param timestamp The timestamp at which the system converged. + */ + public fun onConverge(timestamp: Long) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt new file mode 100644 index 00000000..0b3f5de1 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -0,0 +1,397 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import org.opendc.simulator.resources.* +import java.time.Clock +import kotlin.math.min + +/** + * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + */ +internal class SimResourceContextImpl( + override val parent: SimResourceSystem?, + private val interpreter: SimResourceInterpreterImpl, + private val consumer: SimResourceConsumer, + private val logic: SimResourceProviderLogic +) : SimResourceControllableContext, SimResourceSystem { + /** + * The clock of the context. + */ + override val clock: Clock + get() = interpreter.clock + + /** + * The capacity of the resource. + */ + override var capacity: Double = 0.0 + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** + * The amount of work still remaining at this instant. + */ + override val remainingWork: Double + get() { + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(now).also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE + + /** + * A flag to indicate the state of the context. + */ + override val state: SimResourceState + get() = _state + private var _state = SimResourceState.Pending + + /** + * The current processing speed of the resource. + */ + override val speed: Double + get() = _speed + private var _speed = 0.0 + + /** + * The current state of the resource context. + */ + private var _timestamp: Long = Long.MIN_VALUE + private var _work: Double = 0.0 + private var _limit: Double = 0.0 + private var _deadline: Long = Long.MAX_VALUE + + /** + * The update flag indicating why the update was triggered. + */ + private var _flag: Flag = Flag.None + + /** + * The current pending update. + */ + private var _pendingUpdate: SimResourceInterpreterImpl.Update? = null + + override fun start() { + check(_state == SimResourceState.Pending) { "Consumer is already started" } + interpreter.batch { + consumer.onEvent(this, SimResourceEvent.Start) + _state = SimResourceState.Active + interrupt() + } + } + + override fun close() { + if (_state != SimResourceState.Stopped) { + interpreter.batch { + _state = SimResourceState.Stopped + doStop() + } + } + } + + override fun interrupt() { + if (_state == SimResourceState.Stopped) { + return + } + + enableFlag(Flag.Interrupt) + scheduleUpdate() + } + + override fun invalidate() { + if (_state == SimResourceState.Stopped) { + return + } + + enableFlag(Flag.Invalidate) + scheduleUpdate() + } + + override fun flush() { + if (_state == SimResourceState.Stopped) { + return + } + + doUpdate(clock.millis()) + } + + /** + * Determine whether the state of the resource context should be updated. + */ + fun requiresUpdate(timestamp: Long): Boolean { + // Either the resource context is flagged or there is a pending update at this timestamp + return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp + } + + /** + * Update the state of the resource context. + */ + fun doUpdate(timestamp: Long) { + try { + val oldState = _state + val newState = doUpdate(timestamp, oldState) + + _state = newState + _flag = Flag.None + + when (newState) { + SimResourceState.Pending -> + if (oldState != SimResourceState.Pending) { + throw IllegalStateException("Illegal transition to pending state") + } + SimResourceState.Stopped -> + if (oldState != SimResourceState.Stopped) { + doStop() + } + else -> {} + } + } catch (cause: Throwable) { + doFail(cause) + } finally { + _remainingWorkFlush = Long.MIN_VALUE + _timestamp = timestamp + } + } + + override fun onConverge(timestamp: Long) { + if (_state == SimResourceState.Active) { + consumer.onEvent(this, SimResourceEvent.Run) + } + } + + override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]" + + /** + * Update the state of the resource context. + */ + private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState { + return when (state) { + // Resource context is not active, so its state will not update + SimResourceState.Pending, SimResourceState.Stopped -> state + SimResourceState.Active -> { + val isInterrupted = _flag == Flag.Interrupt + val remainingWork = remainingWork + val isConsume = _limit > 0.0 + + // We should only continue processing the next command if: + // 1. The resource consumption was finished. + // 2. The resource capacity cannot satisfy the demand. + // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) { + next(timestamp) + } else if (isConsume) { + interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp) + } else { + interpret(SimResourceCommand.Idle(_deadline), timestamp) + } + } + } + } + + /** + * Stop the resource context. + */ + private fun doStop() { + try { + consumer.onEvent(this, SimResourceEvent.Exit) + logic.onFinish(this) + } catch (cause: Throwable) { + doFail(cause) + } + } + + /** + * Fail the resource consumer. + */ + private fun doFail(cause: Throwable) { + try { + consumer.onFailure(this, cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + e.printStackTrace() + } + + logic.onFinish(this) + } + + /** + * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + */ + private fun interpret(command: SimResourceCommand, now: Long): SimResourceState { + return when (command) { + is SimResourceCommand.Idle -> { + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = deadline + + val timestamp = logic.onIdle(this, deadline) + scheduleUpdate(timestamp) + + SimResourceState.Active + } + is SimResourceCommand.Consume -> { + val work = command.work + val limit = command.limit + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + _speed = min(capacity, limit) + _work = work + _limit = limit + _deadline = deadline + + val timestamp = logic.onConsume(this, work, limit, deadline) + scheduleUpdate(timestamp) + + SimResourceState.Active + } + is SimResourceCommand.Exit -> { + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = Long.MAX_VALUE + + SimResourceState.Stopped + } + } + } + + /** + * Request the workload for more work. + */ + private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now) + + /** + * Compute the remaining work based on the current state. + */ + private fun computeRemainingWork(now: Long): Double { + return if (_work > 0.0) + logic.getRemainingWork(this, _work, speed, now - _timestamp) + else 0.0 + } + + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + + interpreter.batch { + // Inform the consumer of the capacity change. This might already trigger an interrupt. + consumer.onEvent(this, SimResourceEvent.Capacity) + + // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command. + if (isThrottled) { + invalidate() + } + } + } + + /** + * Enable the specified [flag] taking into account precedence. + */ + private fun enableFlag(flag: Flag) { + _flag = when (_flag) { + Flag.None -> flag + Flag.Invalidate -> + when (flag) { + Flag.None -> flag + else -> flag + } + Flag.Interrupt -> + when (flag) { + Flag.None, Flag.Invalidate -> flag + else -> flag + } + } + } + + /** + * Schedule an update for this resource context. + */ + private fun scheduleUpdate() { + // Cancel the pending update + val pendingUpdate = _pendingUpdate + if (pendingUpdate != null) { + _pendingUpdate = null + pendingUpdate.cancel() + } + + interpreter.scheduleImmediate(this) + } + + /** + * Schedule a delayed update for this resource context. + */ + private fun scheduleUpdate(timestamp: Long) { + val pendingUpdate = _pendingUpdate + if (pendingUpdate != null) { + if (pendingUpdate.timestamp == timestamp) { + // Fast-path: A pending update for the same timestamp already exists + return + } else { + // Cancel the old pending update + _pendingUpdate = null + pendingUpdate.cancel() + } + } + + if (timestamp != Long.MAX_VALUE) { + _pendingUpdate = interpreter.scheduleDelayed(this, timestamp) + } + } + + /** + * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or + * interrupted. + */ + enum class Flag { + None, + Interrupt, + Invalidate + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt new file mode 100644 index 00000000..d09e1b45 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -0,0 +1,300 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Runnable +import org.opendc.simulator.resources.* +import java.time.Clock +import java.util.* +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ +internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The queue of resource updates that are scheduled for immediate execution. + */ + private val queue = ArrayDeque() + + /** + * A priority queue containing the resource updates to be scheduled in the future. + */ + private val futureQueue = PriorityQueue() + + /** + * The stack of interpreter invocations to occur in the future. + */ + private val futureInvocations = ArrayDeque() + + /** + * The index in the batch stack. + */ + private var batchIndex = 0 + + /** + * A flag to indicate that the interpreter is currently active. + */ + private val isRunning: Boolean + get() = batchIndex > 0 + + /** + * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle. + * + * This method should be used when the state of a resource context is invalidated/interrupted and needs to be + * re-computed. In case no interpreter is currently active, the interpreter will be started. + */ + fun scheduleImmediate(ctx: SimResourceContextImpl) { + queue.add(Update(ctx, Long.MIN_VALUE)) + + // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked + // up by the active interpreter. + if (isRunning) { + return + } + + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + } + + /** + * Schedule the interpreter to run at [timestamp] to update the resource contexts. + * + * This method will override earlier calls to this method for the same [ctx]. + * + * @param ctx The resource context to which the event applies. + * @param timestamp The timestamp when the interrupt should happen. + */ + fun scheduleDelayed(ctx: SimResourceContextImpl, timestamp: Long): Update { + val now = clock.millis() + val futureQueue = futureQueue + + require(timestamp >= now) { "Timestamp must be in the future" } + + val update = Update(ctx, timestamp) + futureQueue.add(update) + + // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference. + if (futureQueue.peek() === update) { + trySchedule(futureQueue, futureInvocations) + } + + return update + } + + override fun newContext( + consumer: SimResourceConsumer, + provider: SimResourceProviderLogic, + parent: SimResourceSystem? + ): SimResourceControllableContext = SimResourceContextImpl(parent, this, consumer, provider) + + override fun pushBatch() { + batchIndex++ + } + + override fun popBatch() { + try { + // Flush the work if the platform is not already running + if (batchIndex == 1 && queue.isNotEmpty()) { + runInterpreter() + } + } finally { + batchIndex-- + } + } + + /** + * Interpret all actions that are scheduled for the current timestamp. + */ + private fun runInterpreter() { + val now = clock.millis() + val queue = queue + val futureQueue = futureQueue + val futureInvocations = futureInvocations + val visited = linkedSetOf() + + // Execute all scheduled updates at current timestamp + while (true) { + val update = futureQueue.peek() ?: break + + assert(update.timestamp >= now) { "Internal inconsistency: found update of the past" } + + if (update.timestamp > now && !update.isCancelled) { + // Schedule a task for the next event to occur. + trySchedule(futureQueue, futureInvocations) + break + } + + futureQueue.poll() + + if (update(now) && visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } + } + + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val update = queue.poll() ?: break + if (update(now) && visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } + } + + for (system in visited) { + system.onConverge(now) + } + } while (queue.isNotEmpty()) + } + + /** + * Try to schedule the next interpreter event. + */ + private fun trySchedule(queue: PriorityQueue, scheduled: ArrayDeque) { + val nextTimer = queue.peek() + val now = clock.millis() + + // Check whether we need to update our schedule: + if (nextTimer == null) { + // Case 1: all timers are cancelled + for (invocation in scheduled) { + invocation.cancel() + } + scheduled.clear() + return + } + + while (true) { + val invocation = scheduled.peekFirst() + if (invocation == null || invocation.timestamp > nextTimer.timestamp) { + // Case 2: A new timer was registered ahead of the other timers. + // Solution: Schedule a new scheduler invocation + val nextTimestamp = nextTimer.timestamp + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout( + nextTimestamp - now, + { + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + }, + context + ) + scheduled.addFirst(Invocation(nextTimestamp, handle)) + break + } else if (invocation.timestamp < nextTimer.timestamp) { + // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted + // Solution: Cancel the next scheduler invocation + invocation.cancel() + scheduled.pollFirst() + } else { + break + } + } + } + + /** + * Collect all the ancestors of the specified [system]. + */ + private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet) { + val parent = system.parent + if (parent != null) { + systems.add(parent) + collectAncestors(parent, systems) + } + } + + /** + * A future interpreter invocation. + * + * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private data class Invocation( + @JvmField val timestamp: Long, + @JvmField private val disposableHandle: DisposableHandle + ) { + /** + * Cancel the interpreter invocation. + */ + fun cancel() = disposableHandle.dispose() + } + + /** + * An update call for [ctx] that is scheduled for [timestamp]. + * + * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be + * cancelled if the resource context was invalidated in the meantime. + */ + class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable { + /** + * A flag to indicate that the task has been cancelled. + */ + @JvmField + var isCancelled: Boolean = false + + /** + * Cancel the update. + */ + fun cancel() { + isCancelled = true + } + + /** + * Immediately run update. + */ + operator fun invoke(timestamp: Long): Boolean { + val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp) + if (shouldExecute) { + ctx.doUpdate(timestamp) + } + return shouldExecute + } + + override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp) + + override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]" + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 2b32300e..994ae888 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceAggregatorMaxMin] class. @@ -41,7 +42,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimResourceAggregatorMaxMinTest { @Test fun testSingleCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val forwarder = SimResourceForwarder() @@ -72,7 +73,7 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testDoubleCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -99,7 +100,7 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testOvercommit() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -126,7 +127,7 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testException() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -151,7 +152,7 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -176,7 +177,7 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testFailOverCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 2e2d6588..6cb507ce 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -26,98 +26,109 @@ import io.mockk.* import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.impl.SimResourceContextImpl +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** - * A test suite for the [SimAbstractResourceContext] class. + * A test suite for the [SimResourceContextImpl] class. */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) - context.flush(isIntermediate = false) + context.doUpdate(interpreter.clock.millis()) } @Test fun testIntermediateFlush() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline }) + val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path - context.flush(isIntermediate = true) + context.doUpdate(interpreter.clock.millis()) - verify(exactly = 2) { context.onConsume(any(), any(), any()) } + verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } } @Test fun testIntermediateFlushIdle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline }) + val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) context.start() delay(5) - context.flush(isIntermediate = true) + context.invalidate() delay(5) - context.flush(isIntermediate = true) + context.invalidate() assertAll( - { verify(exactly = 2) { context.onIdle(any()) } }, - { verify(exactly = 1) { context.onFinish() } } + { verify(exactly = 2) { logic.onIdle(any(), any()) } }, + { verify(exactly = 1) { logic.onFinish(any()) } } ) } @Test fun testDoubleStart() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() - assertThrows { context.start() } + + assertThrows { + context.start() + } } @Test fun testIdempodentCapacityChange() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) + context.capacity = 4200.0 context.start() context.capacity = 4200.0 @@ -126,17 +137,19 @@ class SimResourceContextTest { @Test fun testFailureNoInfiniteLoop() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Exit every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} - } + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline + }) + + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 5e86088d..08d88093 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceSource] class. @@ -40,7 +41,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimResourceSourceTest { @Test fun testSpeed() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -63,7 +64,7 @@ class SimResourceSourceTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -83,7 +84,7 @@ class SimResourceSourceTest { @Test fun testSpeedLimit() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -110,7 +111,7 @@ class SimResourceSourceTest { */ @Test fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -133,7 +134,7 @@ class SimResourceSourceTest { @Test fun testInterrupt() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) lateinit var resCtx: SimResourceContext @@ -174,7 +175,7 @@ class SimResourceSourceTest { @Test fun testFailure() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -193,7 +194,7 @@ class SimResourceSourceTest { @Test fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -213,7 +214,7 @@ class SimResourceSourceTest { @Test fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -236,7 +237,7 @@ class SimResourceSourceTest { @Test fun testClosedConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -257,7 +258,7 @@ class SimResourceSourceTest { @Test fun testCloseDuringConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -279,7 +280,7 @@ class SimResourceSourceTest { @Test fun testIdle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -301,7 +302,7 @@ class SimResourceSourceTest { fun testInfiniteSleep() { assertThrows { runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -321,7 +322,7 @@ class SimResourceSourceTest { @Test fun testIncorrectDeadline() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 32b6d8ad..517dcb36 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitchExclusive] class. @@ -44,7 +45,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTrace() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val speed = mutableListOf() @@ -86,7 +87,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk(relaxUnitFun = true) @@ -113,7 +114,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = object : SimResourceConsumer { @@ -158,7 +159,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk(relaxUnitFun = true) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index e7dec172..0b023878 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitch] implementations @@ -40,7 +41,7 @@ import org.opendc.simulator.resources.consumer.SimTraceConsumer internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val switch = SimResourceSwitchMaxMin(scheduler) val sources = List(2) { SimResourceSource(2000.0, scheduler) } @@ -64,7 +65,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L @@ -97,7 +98,7 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(scheduler, listener) + val switch = SimResourceSwitchMaxMin(scheduler, null, listener) val provider = switch.addOutput(3200.0) try { @@ -121,7 +122,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L @@ -163,7 +164,7 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(scheduler, listener) + val switch = SimResourceSwitchMaxMin(scheduler, null, listener) val providerA = switch.addOutput(3200.0) val providerB = switch.addOutput(3200.0) @@ -180,8 +181,8 @@ internal class SimResourceSwitchMaxMinTest { switch.close() } assertAll( - { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, { assertEquals(1200000, clock.millis()) } ) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 880e1755..04886399 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceTransformer] class. @@ -41,7 +42,7 @@ internal class SimResourceTransformerTest { @Test fun testExitImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) launch { @@ -61,7 +62,7 @@ internal class SimResourceTransformerTest { @Test fun testExit() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) launch { @@ -122,7 +123,7 @@ internal class SimResourceTransformerTest { @Test fun testCancelStartedDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk(relaxUnitFun = true) @@ -141,7 +142,7 @@ internal class SimResourceTransformerTest { @Test fun testCancelPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk(relaxUnitFun = true) @@ -160,7 +161,7 @@ internal class SimResourceTransformerTest { @Test fun testExitPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk(relaxUnitFun = true) @@ -176,7 +177,7 @@ internal class SimResourceTransformerTest { @Test fun testAdjustCapacity() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -195,7 +196,7 @@ internal class SimResourceTransformerTest { @Test fun testTransformExit() = runBlockingSimulation { val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index ac8b5814..db4fe856 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimWorkConsumer] class. @@ -35,7 +36,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimWorkConsumerTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 1.0) @@ -50,7 +51,7 @@ internal class SimWorkConsumerTest { @Test fun testUtilization() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 0.5) -- cgit v1.2.3 From cc87c9ad0b8e4ed3fa4fbad4ab94c5e53948ef3c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 2 Jun 2021 13:03:10 +0200 Subject: simulator: Add uniform interface for resource metrics This change adds a new interface to the resources library for accessing metrics of resources such as work, demand and overcommitted work. With this change, we do not need an implementation specific listener interface in SimResourceSwitchMaxMin anymore. Another benefit of this approach is that updates will be scheduled more efficiently and progress will only be reported once the system has reached a steady-state for that timestamp. --- .../simulator/compute/SimAbstractHypervisor.kt | 16 +- .../opendc/simulator/compute/SimAbstractMachine.kt | 6 +- .../simulator/compute/SimBareMetalMachine.kt | 31 ++- .../simulator/compute/SimFairShareHypervisor.kt | 61 +++-- .../compute/SimFairShareHypervisorProvider.kt | 9 +- .../simulator/compute/SimHypervisorProvider.kt | 7 +- .../opendc/simulator/compute/SimProcessingUnit.kt | 5 - .../compute/SimSpaceSharedHypervisorProvider.kt | 9 +- .../opendc/simulator/compute/SimHypervisorTest.kt | 4 +- .../simulator/resources/SimResourceBenchmarks.kt | 8 +- .../resources/SimAbstractResourceAggregator.kt | 105 +++++---- .../resources/SimAbstractResourceProvider.kt | 39 +++- .../simulator/resources/SimResourceAggregator.kt | 12 +- .../resources/SimResourceAggregatorMaxMin.kt | 4 +- .../simulator/resources/SimResourceContext.kt | 5 + .../simulator/resources/SimResourceCounters.kt | 48 ++++ .../simulator/resources/SimResourceDistributor.kt | 11 +- .../resources/SimResourceDistributorMaxMin.kt | 246 ++++++--------------- .../simulator/resources/SimResourceProvider.kt | 22 +- .../resources/SimResourceProviderLogic.kt | 14 +- .../simulator/resources/SimResourceSource.kt | 19 +- .../simulator/resources/SimResourceSwitch.kt | 9 +- .../resources/SimResourceSwitchExclusive.kt | 26 ++- .../simulator/resources/SimResourceSwitchMaxMin.kt | 69 ++---- .../simulator/resources/SimResourceTransformer.kt | 36 +++ .../resources/impl/SimResourceContextImpl.kt | 27 ++- .../resources/impl/SimResourceCountersImpl.kt | 42 ++++ .../resources/impl/SimResourceInterpreterImpl.kt | 33 ++- .../resources/SimResourceAggregatorMaxMinTest.kt | 50 ++++- .../resources/SimResourceSwitchExclusiveTest.kt | 10 +- .../resources/SimResourceSwitchMaxMinTest.kt | 64 +----- .../resources/SimResourceTransformerTest.kt | 17 ++ 32 files changed, 625 insertions(+), 439 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 2df307d3..2b84a17c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -166,13 +166,23 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource /** * The actual resource supporting the processing unit. */ - private val source = switch.addOutput(model.frequency) - - override val speed: Double = 0.0 /* TODO Implement */ + private val source = switch.newOutput() override val state: SimResourceState get() = source.state + override val capacity: Double + get() = source.capacity + + override val speed: Double + get() = source.speed + + override val demand: Double + get() = source.demand + + override val counters: SimResourceCounters + get() = source.counters + override fun startConsumer(consumer: SimResourceConsumer) { source.startConsumer(consumer) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index de2b3eef..d40cdac5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -37,8 +37,12 @@ import java.time.Clock * Abstract implementation of the [SimMachine] interface. * * @param interpreter The interpreter to manage the machine's resources. + * @param parent The parent simulation system. */ -public abstract class SimAbstractMachine(protected val interpreter: SimResourceInterpreter) : SimMachine, SimResourceSystem { +public abstract class SimAbstractMachine( + protected val interpreter: SimResourceInterpreter, + final override val parent: SimResourceSystem? +) : SimMachine, SimResourceSystem { private val _usage = MutableStateFlow(0.0) override val usage: StateFlow get() = _usage diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f5218ba9..082719e2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -28,25 +28,27 @@ import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* import org.opendc.simulator.resources.SimResourceInterpreter -import kotlin.coroutines.* /** * A simulated bare-metal machine that is able to run a single workload. * * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For - * example. the class expects only a single concurrent call to [run]. + * example. The class expects only a single concurrent call to [run]. * - * @param context The [CoroutineContext] to run the simulated workload in. - * @param clock The virtual clock to track the simulation time. + * @param interpreter The [SimResourceInterpreter] to drive the simulation. * @param model The machine model to simulate. + * @param scalingGovernor The CPU frequency scaling governor to use. + * @param scalingDriver The CPU frequency scaling driver to use. + * @param parent The parent simulation system. */ @OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( - platform: SimResourceInterpreter, + interpreter: SimResourceInterpreter, override val model: SimMachineModel, scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver -) : SimAbstractMachine(platform) { + scalingDriver: ScalingDriver, + parent: SimResourceSystem? = null, +) : SimAbstractMachine(interpreter, parent) { override val cpus: List = model.cpus.map { ProcessingUnitImpl(it) } /** @@ -61,8 +63,6 @@ public class SimBareMetalMachine( scalingGovernor.createLogic(this.scalingDriver.createContext(cpu)) } - override val parent: SimResourceSystem? = null - init { scalingGovernors.forEach { it.onStart() } } @@ -89,11 +89,20 @@ public class SimBareMetalMachine( */ private val source = SimResourceSource(model.frequency, interpreter, this@SimBareMetalMachine) + override val state: SimResourceState + get() = source.state + + override val capacity: Double + get() = source.capacity + override val speed: Double get() = source.speed - override val state: SimResourceState - get() = source.state + override val demand: Double + get() = source.demand + + override val counters: SimResourceCounters + get() = source.counters override fun startConsumer(consumer: SimResourceConsumer) { source.startConsumer(consumer) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 33e7e637..3ceb3291 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -26,33 +26,62 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSwitch import org.opendc.simulator.resources.SimResourceSwitchMaxMin +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single * [SimBareMetalMachine] concurrently using weighted fair sharing. * + * @param interpreter The interpreter to manage the machine's resources. + * @param parent The parent simulation system. * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val interpreter: SimResourceInterpreter, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor(interpreter) { +public class SimFairShareHypervisor( + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null, + private val listener: SimHypervisor.Listener? = null +) : SimAbstractHypervisor(interpreter) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { - return SimResourceSwitchMaxMin( - interpreter, null, - object : SimResourceSwitchMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } + return SwitchSystem().switch + } + + private inner class SwitchSystem : SimResourceSystem { + val switch = SimResourceSwitchMaxMin(interpreter, this) + + override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent + + private var lastCpuUsage = 0.0 + private var lastCpuDemand = 0.0 + private var lastDemand = 0.0 + private var lastActual = 0.0 + private var lastOvercommit = 0.0 + private var lastReport = Long.MIN_VALUE + + override fun onConverge(timestamp: Long) { + val listener = listener ?: return + val counters = switch.counters + + if (timestamp > lastReport) { + listener.onSliceFinish( + this@SimFairShareHypervisor, + (counters.demand - lastDemand).toLong(), + (counters.actual - lastActual).toLong(), + (counters.overcommit - lastOvercommit).toLong(), + 0L, + lastCpuUsage, + lastCpuDemand + ) } - ) + lastReport = timestamp + + lastCpuDemand = switch.inputs.sumOf { it.demand } + lastCpuUsage = switch.inputs.sumOf { it.speed } + lastDemand = counters.demand + lastActual = counters.actual + lastOvercommit = counters.overcommit + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt index 68858cc1..d3206196 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" - override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor { - return SimFairShareHypervisor(interpreter, listener) - } + override fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem?, + listener: SimHypervisor.Listener? + ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt index d0753084..8e8c3698 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A service provider interface for constructing a [SimHypervisor]. @@ -39,5 +40,9 @@ public interface SimHypervisorProvider { /** * Create a [SimHypervisor] instance with the specified [listener]. */ - public fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener? = null): SimHypervisor + public fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null, + listener: SimHypervisor.Listener? = null + ): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt index 13c7d9b2..136a543a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt @@ -33,9 +33,4 @@ public interface SimProcessingUnit : SimResourceProvider { * The model representing the static properties of the processing unit. */ public val model: ProcessingUnit - - /** - * The current speed of the processing unit. - */ - public val speed: Double } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index c017c8ab..923b5bab 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor { - return SimSpaceSharedHypervisor(interpreter) - } + override fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem?, + listener: SimHypervisor.Listener? + ): SimHypervisor = SimSpaceSharedHypervisor(interpreter) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index c1b5089c..1709cc23 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -96,7 +96,7 @@ internal class SimHypervisorTest { val platform = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, listener) + val hypervisor = SimFairShareHypervisor(platform, null, listener) launch { machine.run(hypervisor) @@ -171,7 +171,7 @@ internal class SimHypervisorTest { platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, listener) + val hypervisor = SimFairShareHypervisor(platform, null, listener) launch { machine.run(hypervisor) diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index 9233c72d..b45b2a2f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -90,7 +90,7 @@ class SimResourceBenchmarks { switch.addInput(SimResourceSource(3000.0, interpreter)) switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -105,7 +105,7 @@ class SimResourceBenchmarks { repeat(3) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } @@ -120,7 +120,7 @@ class SimResourceBenchmarks { switch.addInput(SimResourceSource(3000.0, interpreter)) switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -135,7 +135,7 @@ class SimResourceBenchmarks { repeat(2) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index be04d399..5fe7d7bb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -42,7 +42,7 @@ public abstract class SimAbstractResourceAggregator( /** * This method is invoked when the resource consumer finishes processing. */ - protected abstract fun doFinish(cause: Throwable?) + protected abstract fun doFinish() /** * This method is invoked when an input context is started. @@ -54,8 +54,9 @@ public abstract class SimAbstractResourceAggregator( */ protected abstract fun onInputFinished(input: Input) + /* SimResourceAggregator */ override fun addInput(input: SimResourceProvider) { - check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + check(state != SimResourceState.Stopped) { "Aggregator has been stopped" } val consumer = Consumer() _inputs.add(input) @@ -63,44 +64,77 @@ public abstract class SimAbstractResourceAggregator( input.startConsumer(consumer) } - override fun close() { - output.close() - } - - override val output: SimResourceProvider - get() = _output - private val _output = SimResourceForwarder() - override val inputs: Set get() = _inputs private val _inputs = mutableSetOf() private val _inputConsumers = mutableListOf() - protected val outputContext: SimResourceContext - get() = context - private val context = interpreter.newContext( - _output, - object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - doIdle(deadline) - return Long.MAX_VALUE - } + /* SimResourceProvider */ + override val state: SimResourceState + get() = _output.state - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - doConsume(work, limit, deadline) - return Long.MAX_VALUE - } + override val capacity: Double + get() = _output.capacity - override fun onFinish(ctx: SimResourceControllableContext) { - doFinish(null) - } + override val speed: Double + get() = _output.speed + + override val demand: Double + get() = _output.demand + + override val counters: SimResourceCounters + get() = _output.counters + + override fun startConsumer(consumer: SimResourceConsumer) { + _output.startConsumer(consumer) + } + + override fun cancel() { + _output.cancel() + } + + override fun interrupt() { + _output.interrupt() + } + + override fun close() { + _output.close() + } + + private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { + override fun createLogic(): SimResourceProviderLogic { + return object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + doIdle(deadline) + return Long.MAX_VALUE + } + + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + doConsume(work, limit, deadline) + return Long.MAX_VALUE + } + + override fun onFinish(ctx: SimResourceControllableContext) { + doFinish() + } + + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } - override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return _inputConsumers.sumOf { it.remainingWork } + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return _inputConsumers.sumOf { it.remainingWork } + } } - }, - parent - ) + } + + /** + * Flush the progress of the output if possible. + */ + fun flush() { + ctx?.flush() + } + } /** * An input for the resource aggregator. @@ -141,7 +175,7 @@ public abstract class SimAbstractResourceAggregator( private fun updateCapacity() { // Adjust capacity of output resource - context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } + _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } } /* Input */ @@ -158,7 +192,7 @@ public abstract class SimAbstractResourceAggregator( this.command = null next } else { - context.flush() + _output.flush() next = command this.command = null @@ -172,11 +206,6 @@ public abstract class SimAbstractResourceAggregator( _ctx = ctx updateCapacity() - // Make sure we initialize the output if we have not done so yet - if (context.state == SimResourceState.Pending) { - context.start() - } - onInputStarted(this) } SimResourceEvent.Capacity -> updateCapacity() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index 519c2615..de26f99e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -22,27 +22,49 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + /** * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations. */ public abstract class SimAbstractResourceProvider( private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, + private val parent: SimResourceSystem?, initialCapacity: Double ) : SimResourceProvider { /** * The capacity of the resource. */ - public open var capacity: Double = initialCapacity - protected set(value) { + public override var capacity: Double = initialCapacity + set(value) { field = value ctx?.capacity = value } + /** + * The current processing speed of the resource. + */ + public override val speed: Double + get() = ctx?.speed ?: 0.0 + + /** + * The resource processing speed demand at this instant. + */ + public override val demand: Double + get() = ctx?.demand ?: 0.0 + + /** + * The resource counters to track the execution metrics of the resource. + */ + public override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + /** * The [SimResourceControllableContext] that is currently running. */ protected var ctx: SimResourceControllableContext? = null + private set /** * The state of the resource provider. @@ -62,6 +84,17 @@ public abstract class SimAbstractResourceProvider( ctx.start() } + /** + * Update the counters of the resource provider. + */ + protected fun updateCounters(ctx: SimResourceContext, work: Double) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += work + counters.actual += work - remainingWork + counters.overcommit += remainingWork + } + final override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } val ctx = interpreter.newContext(consumer, createLogic(), parent) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt index 5c0346cd..00972f43 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -25,12 +25,7 @@ package org.opendc.simulator.resources /** * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. */ -public interface SimResourceAggregator : AutoCloseable { - /** - * The output resource provider to which resource consumers can be attached. - */ - public val output: SimResourceProvider - +public interface SimResourceAggregator : SimResourceProvider { /** * The input resources that will be switched between the output providers. */ @@ -40,9 +35,4 @@ public interface SimResourceAggregator : AutoCloseable { * Add the specified [input] to the aggregator. */ public fun addInput(input: SimResourceProvider) - - /** - * End the lifecycle of the aggregator. - */ - public override fun close() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index bdab6def..c39c1aca 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -38,7 +38,7 @@ public class SimResourceAggregatorMaxMin( // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { val inputCapacity = input.ctx.capacity - val fraction = inputCapacity / outputContext.capacity + val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction val grantedWork = fraction * work @@ -56,7 +56,7 @@ public class SimResourceAggregatorMaxMin( } } - override fun doFinish(cause: Throwable?) { + override fun doFinish() { val iterator = consumers.iterator() for (input in iterator) { iterator.remove() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 7c76c634..0d9a6106 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -44,6 +44,11 @@ public interface SimResourceContext { */ public val speed: Double + /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + /** * The amount of work still remaining at this instant. */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt new file mode 100644 index 00000000..725aa5bc --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * An interface that tracks cumulative counts of the work performed by a resource. + */ +public interface SimResourceCounters { + /** + * The amount of work that resource consumers wanted the resource to perform. + */ + public val demand: Double + + /** + * The amount of work performed by the resource. + */ + public val actual: Double + + /** + * The amount of work that could not be completed due to overcommitted resources. + */ + public val overcommit: Double + + /** + * Reset the resource counters. + */ + public fun reset() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index 8dd1bd2b..e0333ff9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -25,19 +25,14 @@ package org.opendc.simulator.resources /** * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ -public interface SimResourceDistributor : AutoCloseable { +public interface SimResourceDistributor : SimResourceConsumer { /** * The output resource providers to which resource consumers can be attached. */ public val outputs: Set /** - * The input resource that will be distributed over the consumers. + * Create a new output for the distributor. */ - public val input: SimResourceProvider - - /** - * Create a new output for the distributor with the specified [capacity]. - */ - public fun addOutput(capacity: Double): SimResourceProvider + public fun newOutput(): SimResourceProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index e99f5eff..be9e89fb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -29,24 +29,22 @@ import kotlin.math.min * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. */ public class SimResourceDistributorMaxMin( - override val input: SimResourceProvider, private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, - private val listener: Listener? = null + private val parent: SimResourceSystem? = null ) : SimResourceDistributor { override val outputs: Set get() = _outputs private val _outputs = mutableSetOf() /** - * The active outputs. + * The resource context of the consumer. */ - private val activeOutputs: MutableList = mutableListOf() + private var ctx: SimResourceContext? = null /** - * The total speed requested by the output resources. + * The active outputs. */ - private var totalRequestedSpeed = 0.0 + private val activeOutputs: MutableList = mutableListOf() /** * The total amount of work requested by the output resources. @@ -58,145 +56,83 @@ public class SimResourceDistributorMaxMin( */ private var totalAllocatedSpeed = 0.0 - /** - * The total allocated work requested for the output resources. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * The timestamp of the last report. - */ - private var lastReport: Long = Long.MIN_VALUE - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private val consumer = object : SimResourceConsumer { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext - - val remainingWork: Double - get() = ctx.remainingWork - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx.capacity) - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - } - SimResourceEvent.Exit -> { - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() - - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() - - output.close() - } - } - else -> {} - } - } + /* SimResourceDistributor */ + override fun newOutput(): SimResourceProvider { + val provider = Output(ctx?.capacity ?: 0.0) + _outputs.add(provider) + return provider } - /** - * The total amount of remaining work. - */ - private val totalRemainingWork: Double - get() = consumer.remainingWork - - init { - input.startConsumer(consumer) + /* SimResourceConsumer */ + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) } - override fun addOutput(capacity: Double): SimResourceProvider { - check(!isClosed) { "Distributor has been closed" } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + updateCapacity(ctx) + } + SimResourceEvent.Exit -> { + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() - val provider = Output(capacity) - _outputs.add(provider) - return provider - } + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() - override fun close() { - if (!isClosed) { - isClosed = true - input.cancel() + output.close() + } + } + SimResourceEvent.Capacity -> updateCapacity(ctx) + else -> {} } } /** - * Schedule the work over the physical CPUs. + * Schedule the work of the outputs. */ - private fun doSchedule(capacity: Double): SimResourceCommand { - // If there is no work yet, mark all inputs as idle. + private fun doNext(capacity: Double): SimResourceCommand { + // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { return SimResourceCommand.Idle() } - val maxUsage = capacity var duration: Double = Double.MAX_VALUE var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage + var availableSpeed = capacity var totalRequestedSpeed = 0.0 var totalRequestedWork = 0.0 - // Flush the work of the outputs - var outputIterator = activeOutputs.listIterator() - while (outputIterator.hasNext()) { - val output = outputIterator.next() - + // Pull in the work of the outputs + val outputIterator = activeOutputs.listIterator() + for (output in outputIterator) { output.pull() + // Remove outputs that have finished if (output.isFinished) { - // The output consumer has exited, so remove it from the scheduling queue. outputIterator.remove() } } - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + // Sort in-place the outputs based on their requested usage. + // Profiling shows that it is faster than maintaining some kind of sorted set. activeOutputs.sort() // Divide the available input capacity fairly across the outputs using max-min fair sharing - outputIterator = activeOutputs.listIterator() var remaining = activeOutputs.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() + for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- when (val command = output.activeCommand) { is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) - output.actualSpeed = 0.0 } is SimResourceCommand.Consume -> { val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) // Ignore idle computation @@ -211,7 +147,7 @@ public class SimResourceDistributorMaxMin( output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed - // The duration that we want to run is that of the shortest request from an output + // The duration that we want to run is that of the shortest request of an output duration = min(duration, command.work / grantedSpeed) } SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } @@ -220,67 +156,23 @@ public class SimResourceDistributorMaxMin( assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } - this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = maxUsage - availableSpeed - this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)) + this.totalAllocatedSpeed = capacity - availableSpeed + val totalAllocatedWork = min( + totalRequestedWork, + totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration) + ) return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline) else SimResourceCommand.Idle(deadline) } - /** - * Obtain the next command to perform. - */ - private fun doNext(capacity: Double): SimResourceCommand { - val totalRequestedWork = totalRequestedWork.toLong() - val totalAllocatedWork = totalAllocatedWork.toLong() - val totalRemainingWork = totalRemainingWork.toLong() - val totalRequestedSpeed = totalRequestedSpeed - val totalAllocatedSpeed = totalAllocatedSpeed - - // Force all inputs to re-schedule their work. - val command = doSchedule(capacity) - - val now = interpreter.clock.millis() - if (lastReport < now) { - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork, - totalAllocatedWork - totalRemainingWork, - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalAllocatedSpeed, - totalRequestedSpeed - ) - lastReport = now + private fun updateCapacity(ctx: SimResourceContext) { + for (output in _outputs) { + output.capacity = ctx.capacity } - - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - return command - } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) } /** @@ -322,7 +214,7 @@ public class SimResourceDistributorMaxMin( interpreter.batch { ctx.start() // Interrupt the input to re-schedule the resources - input.interrupt() + this@SimResourceDistributorMaxMin.ctx?.interrupt() } } @@ -338,8 +230,6 @@ public class SimResourceDistributorMaxMin( /* SimResourceProviderLogic */ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - reportOvercommit(ctx.remainingWork) - allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) lastCommandTimestamp = ctx.clock.millis() @@ -348,8 +238,6 @@ public class SimResourceDistributorMaxMin( } override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - reportOvercommit(ctx.remainingWork) - allowedSpeed = ctx.speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) lastCommandTimestamp = ctx.clock.millis() @@ -357,31 +245,25 @@ public class SimResourceDistributorMaxMin( return Long.MAX_VALUE } - override fun onFinish(ctx: SimResourceControllableContext) { - reportOvercommit(ctx.remainingWork) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + override fun onFinish(ctx: SimResourceControllableContext) { activeCommand = SimResourceCommand.Exit lastCommandTimestamp = ctx.clock.millis() } override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 + val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - // Compute the remaining amount of work return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM + // Compute the fraction of compute time allocated to the output val fraction = actualSpeed / totalAllocatedSpeed - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) + // Compute the work that was actually granted to the output. + val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction + max(0.0, work - processingAvailable) } else { 0.0 } @@ -399,9 +281,5 @@ public class SimResourceDistributorMaxMin( ctx.flush() } } - - private fun reportOvercommit(remainingWork: Double) { - totalOvercommittedWork += remainingWork - } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 2f567a5e..f709ca17 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -27,7 +27,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException /** - * A [SimResourceProvider] provides some resource of type [R]. + * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. */ public interface SimResourceProvider : AutoCloseable { /** @@ -35,6 +35,26 @@ public interface SimResourceProvider : AutoCloseable { */ public val state: SimResourceState + /** + * The resource capacity available at this instant. + */ + public val capacity: Double + + /** + * The current processing speed of the resource. + */ + public val speed: Double + + /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + + /** + * The resource counters to track the execution metrics of the resource. + */ + public val counters: SimResourceCounters + /** * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. * diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 22676984..5231ecf5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -29,7 +29,7 @@ import kotlin.math.max */ public interface SimResourceProviderLogic { /** - * This method is invoked when the resource will idle until the specified [deadline]. + * This method is invoked when the resource is reported to idle until the specified [deadline]. * * @param ctx The context in which the provider runs. * @param deadline The deadline that was requested by the resource consumer. @@ -38,8 +38,8 @@ public interface SimResourceProviderLogic { public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long /** - * This method is invoked when the resource will be consumed until the specified [work] was processed or the - * [deadline] was reached. + * This method is invoked when the resource will be consumed until the specified amount of [work] was processed + * or [deadline] is reached. * * @param ctx The context in which the provider runs. * @param work The amount of work that was requested by the resource consumer. @@ -49,6 +49,14 @@ public interface SimResourceProviderLogic { */ public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long + /** + * This method is invoked when the progress of the resource consumer is materialized. + * + * @param ctx The context in which the provider runs. + * @param work The amount of work that was requested by the resource consumer. + */ + public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {} + /** * This method is invoked when the resource consumer has finished. */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index d984d2a5..9f062cc3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -37,21 +37,6 @@ public class SimResourceSource( private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null ) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { - /** - * The current processing speed of the resource. - */ - public val speed: Double - get() = ctx?.speed ?: 0.0 - - /** - * The capacity of the resource. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { @@ -62,6 +47,10 @@ public class SimResourceSource( return min(deadline, ctx.clock.millis() + getDuration(work, speed)) } + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + override fun onFinish(ctx: SimResourceControllableContext) { cancel() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index 53fec16a..e224285e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable { public val inputs: Set /** - * Add an output to the switch with the specified [capacity]. + * The resource counters to track the execution metrics of all switch resources. */ - public fun addOutput(capacity: Double): SimResourceProvider + public val counters: SimResourceCounters + + /** + * Create a new output on the switch. + */ + public fun newOutput(): SimResourceProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 7f1bb2b7..2950af80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override val inputs: Set get() = _inputs - override fun addOutput(capacity: Double): SimResourceProvider { + override val counters: SimResourceCounters = object : SimResourceCounters { + override val demand: Double + get() = _inputs.sumOf { it.counters.demand } + override val actual: Double + get() = _inputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _inputs.sumOf { it.counters.overcommit } + + override fun reset() { + for (input in _inputs) { + input.counters.reset() + } + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(capacity, forwarder) + val output = Provider(forwarder) _outputs += output return output } @@ -84,10 +101,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { _inputs.forEach(SimResourceProvider::cancel) } - private inner class Provider( - private val capacity: Double, - private val forwarder: SimResourceTransformer - ) : SimResourceProvider by forwarder { + private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { override fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. _outputs -= this diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 61887e34..684a1b52 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -28,16 +28,25 @@ package org.opendc.simulator.resources */ public class SimResourceSwitchMaxMin( interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null, - private val listener: Listener? = null + parent: SimResourceSystem? = null ) : SimResourceSwitch { - private val _outputs = mutableSetOf() + /** + * The output resource providers to which resource consumers can be attached. + */ override val outputs: Set - get() = _outputs + get() = distributor.outputs - private val _inputs = mutableSetOf() + /** + * The input resources that will be switched between the output providers. + */ override val inputs: Set - get() = _inputs + get() = aggregator.inputs + + /** + * The resource counters to track the execution metrics of all switch resources. + */ + override val counters: SimResourceCounters + get() = aggregator.counters /** * A flag to indicate that the switch was closed. @@ -52,32 +61,19 @@ public class SimResourceSwitchMaxMin( /** * The distributor to distribute the aggregated resources. */ - private val distributor = SimResourceDistributorMaxMin( - aggregator.output, interpreter, parent, - object : SimResourceDistributorMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } - } - ) + private val distributor = SimResourceDistributorMaxMin(interpreter, parent) + + init { + aggregator.startConsumer(distributor) + } /** - * Add an output to the switch with the specified [capacity]. + * Add an output to the switch. */ - override fun addOutput(capacity: Double): SimResourceProvider { + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } - val provider = distributor.addOutput(capacity) - _outputs.add(provider) - return provider + return distributor.newOutput() } /** @@ -92,26 +88,7 @@ public class SimResourceSwitchMaxMin( override fun close() { if (!isClosed) { isClosed = true - distributor.close() aggregator.close() } } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 32f3f573..fd3d1230 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + /** * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider. * @@ -53,6 +55,19 @@ public class SimResourceTransformer( override var state: SimResourceState = SimResourceState.Pending private set + override val capacity: Double + get() = ctx?.capacity ?: 0.0 + + override val speed: Double + get() = ctx?.speed ?: 0.0 + + override val demand: Double + get() = ctx?.demand ?: 0.0 + + override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } @@ -97,10 +112,15 @@ public class SimResourceTransformer( start() } + updateCounters(ctx) + return if (state == SimResourceState.Stopped) { SimResourceCommand.Exit } else if (delegate != null) { val command = transform(ctx, delegate.onNext(ctx)) + + _work = if (command is SimResourceCommand.Consume) command.work else 0.0 + if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards @@ -169,6 +189,22 @@ public class SimResourceTransformer( state = SimResourceState.Pending } } + + /** + * Counter to track the current submitted work. + */ + private var _work = 0.0 + + /** + * Update the resource counters for the transformer. + */ + private fun updateCounters(ctx: SimResourceContext) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += _work + counters.actual += _work - remainingWork + counters.overcommit += remainingWork + } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 0b3f5de1..46c5c63f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -86,6 +86,26 @@ internal class SimResourceContextImpl( get() = _speed private var _speed = 0.0 + /** + * The current resource processing demand. + */ + override val demand: Double + get() = _limit + + private val counters = object : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + /** * The current state of the resource context. */ @@ -145,7 +165,7 @@ internal class SimResourceContextImpl( return } - doUpdate(clock.millis()) + interpreter.scheduleSync(this) } /** @@ -206,6 +226,11 @@ internal class SimResourceContextImpl( val remainingWork = remainingWork val isConsume = _limit > 0.0 + // Update the resource counters only if there is some progress + if (timestamp > _timestamp) { + logic.onUpdate(this, _work) + } + // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource capacity cannot satisfy the demand. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt new file mode 100644 index 00000000..827019c5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import org.opendc.simulator.resources.SimResourceCounters + +/** + * Mutable implementation of the [SimResourceCounters] interface. + */ +internal class SimResourceCountersImpl : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt index d09e1b45..cb0d6160 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -60,6 +60,11 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, */ private val futureInvocations = ArrayDeque() + /** + * The systems that have been visited during the interpreter cycle. + */ + private val visited = linkedSetOf() + /** * The index in the batch stack. */ @@ -94,6 +99,30 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } } + /** + * Update the specified [ctx] synchronously. + */ + fun scheduleSync(ctx: SimResourceContextImpl) { + ctx.doUpdate(clock.millis()) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) + } + + // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked + // up by the active interpreter. + if (isRunning) { + return + } + + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + } + /** * Schedule the interpreter to run at [timestamp] to update the resource contexts. * @@ -148,7 +177,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, val queue = queue val futureQueue = futureQueue val futureInvocations = futureInvocations - val visited = linkedSetOf() + val visited = visited // Execute all scheduled updates at current timestamp while (true) { @@ -183,6 +212,8 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, for (system in visited) { system.onConverge(now) } + + visited.clear() } while (queue.isNotEmpty()) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 994ae888..51024e80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -59,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest { source.startConsumer(adapter) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertAll( @@ -67,7 +67,7 @@ internal class SimResourceAggregatorMaxMinTest { { assertEquals(listOf(0.0, 0.5, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @@ -87,14 +87,14 @@ internal class SimResourceAggregatorMaxMinTest { val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(adapter) + aggregator.consume(adapter) yield() assertAll( { assertEquals(1000, clock.millis()) }, { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @@ -115,13 +115,13 @@ internal class SimResourceAggregatorMaxMinTest { .andThen(SimResourceCommand.Exit) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertEquals(1000, clock.millis()) verify(exactly = 2) { consumer.onNext(any()) } } finally { - aggregator.output.close() + aggregator.close() } } @@ -142,11 +142,11 @@ internal class SimResourceAggregatorMaxMinTest { .andThenThrows(IllegalStateException("Test Exception")) try { - assertThrows { aggregator.output.consume(consumer) } + assertThrows { aggregator.consume(consumer) } yield() assertEquals(SimResourceState.Pending, sources[0].state) } finally { - aggregator.output.close() + aggregator.close() } } @@ -164,14 +164,14 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(4.0, 1.0) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(1000) sources[0].capacity = 0.5 } yield() assertEquals(2334, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() } } @@ -189,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(1.0, 0.5) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(500) sources[0].capacity = 0.5 } yield() assertEquals(1000, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() + } + } + + @Test + fun testCounters() = runBlockingSimulation { + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(scheduler) + val sources = listOf( + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) + assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } + } finally { + aggregator.close() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 517dcb36..ad8d82e3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -67,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest { source.startConsumer(adapter) switch.addInput(forwarder) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -98,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -142,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -170,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - switch.addOutput(3200.0) - assertThrows { switch.addOutput(3200.0) } + switch.newOutput() + assertThrows { switch.newOutput() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 0b023878..e4292ec0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -47,7 +47,7 @@ internal class SimResourceSwitchMaxMinTest { val sources = List(2) { SimResourceSource(2000.0, scheduler) } sources.forEach { switch.addInput(it) } - val provider = switch.addOutput(1000.0) + val provider = switch.newOutput() val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit @@ -67,26 +67,6 @@ internal class SimResourceSwitchMaxMinTest { fun testOvercommittedSingle() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - val duration = 5 * 60L val workload = SimTraceConsumer( @@ -98,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(scheduler, null, listener) - val provider = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val provider = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -110,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest { } assertAll( - { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -124,26 +104,6 @@ internal class SimResourceSwitchMaxMinTest { fun testOvercommittedDual() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - val duration = 5 * 60L val workloadA = SimTraceConsumer( @@ -164,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(scheduler, null, listener) - val providerA = switch.addOutput(3200.0) - val providerB = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val providerA = switch.newOutput() + val providerB = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -181,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest { switch.close() } assertAll( - { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 04886399..810052b8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -206,4 +206,21 @@ internal class SimResourceTransformerTest { assertEquals(0, clock.millis()) verify(exactly = 1) { consumer.onNext(any()) } } + + @Test + fun testCounters() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) + + val consumer = SimWorkConsumer(2.0, 1.0) + source.startConsumer(forwarder) + + forwarder.consume(consumer) + + assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } + assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } + assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(2000, clock.millis()) + } } -- cgit v1.2.3 From 1ee02377aca356a99506ac247b376d7cf070048d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 2 Jun 2021 23:06:36 +0200 Subject: simulator: Start consumers directly from workload This change updates the SimWorkload interfaces to allow implementations to start consumers for the machine resource providers directly. --- .../simulator/compute/SimAbstractHypervisor.kt | 115 +++------------------ .../opendc/simulator/compute/SimAbstractMachine.kt | 104 +++++++++++++------ .../simulator/compute/SimBareMetalMachine.kt | 51 ++------- .../opendc/simulator/compute/SimMachineContext.kt | 13 ++- .../simulator/compute/util/SimWorkloadLifecycle.kt | 76 ++++++++++++++ .../simulator/compute/workload/SimFlopsWorkload.kt | 12 +-- .../compute/workload/SimRuntimeWorkload.kt | 14 +-- .../simulator/compute/workload/SimTraceWorkload.kt | 55 +++++----- .../simulator/compute/workload/SimWorkload.kt | 9 +- 9 files changed, 228 insertions(+), 221 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 2b84a17c..68ecc49f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -22,17 +22,10 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.launch import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* import org.opendc.simulator.resources.SimResourceSwitch -import java.time.Clock /** * Abstract implementation of the [SimHypervisor] interface. @@ -86,117 +79,37 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource * @property performanceInterferenceModel The performance interference model to utilize. */ private inner class VirtualMachine( - override val model: SimMachineModel, + model: SimMachineModel, val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimMachine { - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ - override val usage: MutableStateFlow = MutableStateFlow(0.0) - - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - + ) : SimAbstractMachine(interpreter, parent = null, model) { /** * The vCPUs of the machine. */ - private val cpus = model.cpus.map { ProcessingUnitImpl(it, switch) } - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map) { - coroutineScope { - require(!isTerminated) { "Machine is terminated" } + override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) } - val ctx = object : SimMachineContext { - override val cpus: List = this@VirtualMachine.cpus - - override val memory: List - get() = model.memory - - override val clock: Clock - get() = this@SimAbstractHypervisor.context.clock - - override val meta: Map = meta - } - - interpreter.batch { - workload.onStart(ctx) - - for (cpu in cpus) { - launch { - cpu.consume(workload.getConsumer(ctx, cpu.model)) - } - } - } - } - } - - /** - * Terminate this VM instance. - */ override fun close() { - if (!isTerminated) { - isTerminated = true + super.close() - cpus.forEach(SimProcessingUnit::close) - _vms.remove(this) - } + _vms.remove(this) } } override fun onStart(ctx: SimMachineContext) { context = ctx switch = createSwitch(ctx) - } - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - val forwarder = SimResourceForwarder() - switch.addInput(forwarder) - return forwarder + for (cpu in ctx.cpus) { + switch.addInput(cpu) + } } /** - * The [SimProcessingUnit] of this machine. + * A [SimProcessingUnit] of a virtual machine. */ - public inner class ProcessingUnitImpl(override val model: ProcessingUnit, switch: SimResourceSwitch) : SimProcessingUnit { - /** - * The actual resource supporting the processing unit. - */ - private val source = switch.newOutput() - - override val state: SimResourceState - get() = source.state - - override val capacity: Double - get() = source.capacity - - override val speed: Double - get() = source.speed - - override val demand: Double - get() = source.demand - - override val counters: SimResourceCounters - get() = source.counters - - override fun startConsumer(consumer: SimResourceConsumer) { - source.startConsumer(consumer) - } - - override fun interrupt() { - source.interrupt() - } - - override fun cancel() { - source.cancel() - } - - override fun close() { - source.close() - } + private class VCpu( + private val source: SimResourceProvider, + override val model: ProcessingUnit + ) : SimProcessingUnit, SimResourceProvider by source { + override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index d40cdac5..e12ac72b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -27,24 +27,27 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSystem -import org.opendc.simulator.resources.batch -import org.opendc.simulator.resources.consume -import java.time.Clock +import org.opendc.simulator.resources.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. * * @param interpreter The interpreter to manage the machine's resources. * @param parent The parent simulation system. + * @param model The model of the machine. */ public abstract class SimAbstractMachine( protected val interpreter: SimResourceInterpreter, - final override val parent: SimResourceSystem? + final override val parent: SimResourceSystem?, + final override val model: SimMachineModel ) : SimMachine, SimResourceSystem { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ private val _usage = MutableStateFlow(0.0) - override val usage: StateFlow + public final override val usage: StateFlow get() = _usage /** @@ -54,50 +57,67 @@ public abstract class SimAbstractMachine( get() = _speed private var _speed = doubleArrayOf() - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - /** * The resources allocated for this machine. */ protected abstract val cpus: List /** - * The execution context in which the workload runs. + * A flag to indicate that the machine is terminated. */ - private inner class Context(override val meta: Map) : SimMachineContext { - override val clock: Clock - get() = interpreter.clock - - override val cpus: List = this@SimAbstractMachine.cpus + private var isTerminated = false - override val memory: List = model.memory - } + /** + * The continuation to resume when the virtual machine workload has finished. + */ + private var cont: Continuation? = null /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload, meta: Map): Unit = coroutineScope { + override suspend fun run(workload: SimWorkload, meta: Map) { check(!isTerminated) { "Machine is terminated" } + check(cont == null) { "A machine cannot run concurrently" } + val ctx = Context(meta) // Before the workload starts, initialize the initial power draw _speed = DoubleArray(model.cpus.size) { 0.0 } updateUsage(0.0) - interpreter.batch { - workload.onStart(ctx) + return suspendCancellableCoroutine { cont -> + this.cont = cont + + // Cancel all cpus on cancellation + cont.invokeOnCancellation { + this.cont = null + interpreter.batch { + for (cpu in cpus) { + cpu.cancel() + } + } + } + + interpreter.batch { workload.onStart(ctx) } + } + } + + override fun close() { + if (isTerminated) { + return + } + + isTerminated = true + cancel() + interpreter.batch { for (cpu in cpus) { - val model = cpu.model - val consumer = workload.getConsumer(ctx, model) - launch { cpu.consume(consumer) } + cpu.close() } } } + /* SimResourceSystem */ override fun onConverge(timestamp: Long) { val totalCapacity = model.cpus.sumOf { it.frequency } val cpus = cpus @@ -117,12 +137,34 @@ public abstract class SimAbstractMachine( _usage.value = usage } - override fun close() { - if (isTerminated) { - return + /** + * Cancel the workload that is currently running on the machine. + */ + private fun cancel() { + interpreter.batch { + for (cpu in cpus) { + cpu.cancel() + } } - isTerminated = true - cpus.forEach(SimProcessingUnit::close) + val cont = cont + if (cont != null) { + this.cont = null + cont.resume(Unit) + } + } + + /** + * The execution context in which the workload runs. + */ + private inner class Context(override val meta: Map) : SimMachineContext { + override val interpreter: SimResourceInterpreter + get() = this@SimAbstractMachine.interpreter + + override val cpus: List = this@SimAbstractMachine.cpus + + override val memory: List = model.memory + + override fun close() = cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 082719e2..7a49f29a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -44,12 +44,14 @@ import org.opendc.simulator.resources.SimResourceInterpreter @OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( interpreter: SimResourceInterpreter, - override val model: SimMachineModel, + model: SimMachineModel, scalingGovernor: ScalingGovernor, scalingDriver: ScalingDriver, parent: SimResourceSystem? = null, -) : SimAbstractMachine(interpreter, parent) { - override val cpus: List = model.cpus.map { ProcessingUnitImpl(it) } +) : SimAbstractMachine(interpreter, parent, model) { + override val cpus: List = model.cpus.map { cpu -> + Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu) + } /** * Construct the [ScalingDriver.Logic] for this machine. @@ -81,43 +83,12 @@ public class SimBareMetalMachine( } /** - * The [SimProcessingUnit] of this machine. + * A [SimProcessingUnit] of a bare-metal machine. */ - public inner class ProcessingUnitImpl(override val model: ProcessingUnit) : SimProcessingUnit { - /** - * The actual resource supporting the processing unit. - */ - private val source = SimResourceSource(model.frequency, interpreter, this@SimBareMetalMachine) - - override val state: SimResourceState - get() = source.state - - override val capacity: Double - get() = source.capacity - - override val speed: Double - get() = source.speed - - override val demand: Double - get() = source.demand - - override val counters: SimResourceCounters - get() = source.counters - - override fun startConsumer(consumer: SimResourceConsumer) { - source.startConsumer(consumer) - } - - override fun interrupt() { - source.interrupt() - } - - override fun cancel() { - source.cancel() - } - - override fun close() { - source.close() - } + private class Cpu( + private val source: SimResourceProvider, + override val model: ProcessingUnit + ) : SimProcessingUnit, SimResourceProvider by source { + override fun toString(): String = "SimBareMetalMachine.Cpu[model=$model]" } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index c2523a2a..5cbabc86 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -23,18 +23,18 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.MemoryUnit -import java.time.Clock +import org.opendc.simulator.resources.SimResourceInterpreter /** * A simulated execution context in which a bootable image runs. This interface represents the * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on * which the image runs. */ -public interface SimMachineContext { +public interface SimMachineContext : AutoCloseable { /** - * The virtual clock tracking simulation time. + * The resource interpreter that simulates the machine. */ - public val clock: Clock + public val interpreter: SimResourceInterpreter /** * The metadata associated with the context. @@ -50,4 +50,9 @@ public interface SimMachineContext { * The memory available on the machine */ public val memory: List + + /** + * Stop the workload. + */ + public override fun close() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt new file mode 100644 index 00000000..43662d93 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.util + +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.SimResourceEvent + +/** + * A helper class to manage the lifecycle of a [SimWorkload] + */ +public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { + /** + * The resource consumers which represent the lifecycle of the workload. + */ + private val waiting = mutableSetOf() + + /** + * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. + */ + public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer { + waiting.add(consumer) + return object : SimResourceConsumer by consumer { + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + try { + consumer.onEvent(ctx, event) + } finally { + if (event == SimResourceEvent.Exit) { + complete(consumer) + } + } + } + + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + try { + consumer.onFailure(ctx, cause) + } finally { + complete(consumer) + } + } + + override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" + } + } + + /** + * Complete the specified [SimResourceConsumer]. + */ + private fun complete(consumer: SimResourceConsumer) { + if (waiting.remove(consumer) && waiting.isEmpty()) { + ctx.close() + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 63c9d28c..de6832ca 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,8 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.compute.util.SimWorkloadLifecycle import org.opendc.simulator.resources.consumer.SimWorkConsumer /** @@ -43,10 +42,11 @@ public class SimFlopsWorkload( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimMachineContext) {} - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization) + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + for (cpu in ctx.cpus) { + cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization))) + } } override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index a3420e32..318a6b49 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -23,8 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.compute.util.SimWorkloadLifecycle import org.opendc.simulator.resources.consumer.SimWorkConsumer /** @@ -42,11 +41,12 @@ public class SimRuntimeWorkload( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimMachineContext) {} - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - val limit = cpu.frequency * utilization - return SimWorkConsumer((limit / 1000) * duration, utilization) + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + for (cpu in ctx.cpus) { + val limit = cpu.capacity * utilization + cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization))) + } } override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index ffb332d1..6929f4d2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.util.SimWorkloadLifecycle import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -44,33 +45,12 @@ public class SimTraceWorkload(public val trace: Sequence) : SimWorkloa barrier = SimConsumerBarrier(ctx.cpus.size) fragment = nextFragment() - offset = ctx.clock.millis() - } - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - return object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val usage = fragment.usage / fragment.cores - val work = (fragment.duration / 1000) * usage - val deadline = offset + fragment.duration - - assert(deadline >= now) { "Deadline already passed" } - - val cmd = - if (cpu.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, usage, deadline) - else - SimResourceCommand.Idle(deadline) + offset = ctx.interpreter.clock.millis() - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration - } + val lifecycle = SimWorkloadLifecycle(ctx) - return cmd - } + for (cpu in ctx.cpus) { + cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model))) } } @@ -87,6 +67,31 @@ public class SimTraceWorkload(public val trace: Sequence) : SimWorkloa } } + private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val usage = fragment.usage / fragment.cores + val work = (fragment.duration / 1000) * usage + val deadline = offset + fragment.duration + + assert(deadline >= now) { "Deadline already passed" } + + val cmd = + if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, usage, deadline) + else + SimResourceCommand.Idle(deadline) + + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } + + return cmd + } + } + /** * A fragment of the workload. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index bdc12bb5..b80665fa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -23,8 +23,6 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer /** * A model that characterizes the runtime behavior of some particular workload. @@ -35,11 +33,8 @@ import org.opendc.simulator.resources.SimResourceConsumer public interface SimWorkload { /** * This method is invoked when the workload is started. + * + * @param ctx The execution context in which the machine runs. */ public fun onStart(ctx: SimMachineContext) - - /** - * Obtain the resource consumer for the specified processing unit. - */ - public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer } -- cgit v1.2.3 From 84468f4e3a331d7ea073ac7033b3d9937168ed9f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 2 Jun 2021 23:23:55 +0200 Subject: simulator: Split CPUFreq subsystem in compute simulator This change splits the functionality present in the CPUFreq subsystem of the compute simulation. Currently, the DVFS functionality is embedded in SimBareMetalMachine. However, this functionality should not exist within the firmware layer of a machine. Instead, the operating system should perform this logic (in OpenDC this should be the hypervisor). Furthermore, this change moves the scaling driver into the power package. The power driver is a machine/firmware specific implementation that computes the power consumption of a machine. --- .../simulator/compute/SimMachineBenchmarks.kt | 15 +-- .../simulator/compute/SimBareMetalMachine.kt | 29 +---- .../compute/cpufreq/PStateScalingDriver.kt | 86 -------------- .../simulator/compute/cpufreq/ScalingDriver.kt | 53 --------- .../compute/cpufreq/SimpleScalingDriver.kt | 49 -------- .../simulator/compute/power/PStatePowerDriver.kt | 60 ++++++++++ .../opendc/simulator/compute/power/PowerDriver.kt | 48 ++++++++ .../simulator/compute/power/SimplePowerDriver.kt | 39 ++++++ .../opendc/simulator/compute/SimHypervisorTest.kt | 11 +- .../org/opendc/simulator/compute/SimMachineTest.kt | 27 ++++- .../compute/SimSpaceSharedHypervisorTest.kt | 21 ++-- .../compute/cpufreq/PStateScalingDriverTest.kt | 132 --------------------- .../compute/power/PStatePowerDriverTest.kt | 125 +++++++++++++++++++ 13 files changed, 315 insertions(+), 380 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index bc21edc6..fb753de2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -25,12 +25,11 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation @@ -84,8 +83,7 @@ class SimMachineBenchmarks { fun benchmarkBareMetal(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace)) } @@ -95,8 +93,7 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimSpaceSharedHypervisor(interpreter) @@ -117,8 +114,7 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimFairShareHypervisor(interpreter) @@ -139,8 +135,7 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimFairShareHypervisor(interpreter) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 7a49f29a..c453cdf3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -22,10 +22,8 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* -import org.opendc.simulator.compute.cpufreq.ScalingDriver -import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.resources.* import org.opendc.simulator.resources.SimResourceInterpreter @@ -37,16 +35,13 @@ import org.opendc.simulator.resources.SimResourceInterpreter * * @param interpreter The [SimResourceInterpreter] to drive the simulation. * @param model The machine model to simulate. - * @param scalingGovernor The CPU frequency scaling governor to use. - * @param scalingDriver The CPU frequency scaling driver to use. + * @param powerDriver The power driver to use. * @param parent The parent simulation system. */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( interpreter: SimResourceInterpreter, model: SimMachineModel, - scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver, + powerDriver: PowerDriver, parent: SimResourceSystem? = null, ) : SimAbstractMachine(interpreter, parent, model) { override val cpus: List = model.cpus.map { cpu -> @@ -54,20 +49,9 @@ public class SimBareMetalMachine( } /** - * Construct the [ScalingDriver.Logic] for this machine. + * Construct the [PowerDriver.Logic] for this machine. */ - private val scalingDriver = scalingDriver.createLogic(this) - - /** - * The scaling contexts associated with each CPU. - */ - private val scalingGovernors = cpus.map { cpu -> - scalingGovernor.createLogic(this.scalingDriver.createContext(cpu)) - } - - init { - scalingGovernors.forEach { it.onStart() } - } + private val powerDriver = powerDriver.createLogic(this, cpus) /** * The power draw of the machine. @@ -78,8 +62,7 @@ public class SimBareMetalMachine( override fun updateUsage(usage: Double) { super.updateUsage(usage) - scalingGovernors.forEach { it.onLimit() } - powerDraw = scalingDriver.computePower() + powerDraw = powerDriver.computePower() } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt deleted file mode 100644 index 6f44d778..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.cpufreq - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.SimProcessingUnit -import org.opendc.simulator.compute.power.PowerModel -import java.util.* -import kotlin.math.max -import kotlin.math.min - -/** - * A [ScalingDriver] that scales the frequency of the processor based on a discrete set of frequencies. - * - * @param states A map describing the states of the driver. - */ -public class PStateScalingDriver(states: Map) : ScalingDriver { - /** - * The P-States defined by the user and ordered by key. - */ - private val states = TreeMap(states) - - override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic { - /** - * The scaling contexts. - */ - private val contexts = mutableListOf() - - override fun createContext(cpu: SimProcessingUnit): ScalingContext { - val ctx = ScalingContextImpl(machine, cpu) - contexts.add(ctx) - return ctx - } - - override fun computePower(): Double { - var targetFreq = 0.0 - var totalSpeed = 0.0 - - for (ctx in contexts) { - targetFreq = max(ctx.target, targetFreq) - totalSpeed += ctx.cpu.speed - } - - val maxFreq = states.lastKey() - val (actualFreq, model) = states.ceilingEntry(min(maxFreq, targetFreq)) - val utilization = totalSpeed / (actualFreq * contexts.size) - return model.computePower(utilization) - } - - override fun toString(): String = "PStateScalingDriver.Logic" - } - - private class ScalingContextImpl( - override val machine: SimMachine, - override val cpu: SimProcessingUnit - ) : ScalingContext { - var target = cpu.model.frequency - private set - - override fun setTarget(freq: Double) { - target = freq - } - - override fun toString(): String = "PStateScalingDriver.Context" - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt deleted file mode 100644 index b4fd7550..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.cpufreq - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.SimProcessingUnit - -/** - * A [ScalingDriver] is responsible for switching the processor to the correct frequency. - */ -public interface ScalingDriver { - /** - * Create the scaling logic for the specified [machine] - */ - public fun createLogic(machine: SimMachine): Logic - - /** - * The logic of the scaling driver. - */ - public interface Logic { - /** - * Create the [ScalingContext] for the specified [cpu] instance. - */ - public fun createContext(cpu: SimProcessingUnit): ScalingContext - - /** - * Compute the power consumption of the processor. - * - * @return The power consumption of the processor in W. - */ - public fun computePower(): Double - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt deleted file mode 100644 index cf0bbb28..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.cpufreq - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.SimProcessingUnit -import org.opendc.simulator.compute.power.PowerModel - -/** - * A [ScalingDriver] that ignores the instructions of the [ScalingGovernor] and directly computes the power consumption - * based on the specified [power model][model]. - */ -public class SimpleScalingDriver(private val model: PowerModel) : ScalingDriver { - override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic { - override fun createContext(cpu: SimProcessingUnit): ScalingContext { - return object : ScalingContext { - override val machine: SimMachine = machine - - override val cpu: SimProcessingUnit = cpu - - override fun setTarget(freq: Double) {} - } - } - - override fun computePower(): Double = model.computePower(machine.usage.value) - - override fun toString(): String = "SimpleScalingDriver.Logic" - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt new file mode 100644 index 00000000..6328c8e4 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.power + +import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.SimProcessingUnit +import java.util.* +import kotlin.math.max +import kotlin.math.min + +/** + * A [PowerDriver] that computes the power draw using multiple [PowerModel]s based on multiple frequency states. + * + * @param states A map describing the states of the driver. + */ +public class PStatePowerDriver(states: Map) : PowerDriver { + /** + * The P-States defined by the user and ordered by key. + */ + private val states = TreeMap(states) + + override fun createLogic(machine: SimMachine, cpus: List): PowerDriver.Logic = object : PowerDriver.Logic { + override fun computePower(): Double { + var targetFreq = 0.0 + var totalSpeed = 0.0 + + for (cpu in cpus) { + targetFreq = max(cpu.capacity, targetFreq) + totalSpeed += cpu.speed + } + + val maxFreq = states.lastKey() + val (actualFreq, model) = states.ceilingEntry(min(maxFreq, targetFreq)) + val utilization = totalSpeed / (actualFreq * cpus.size) + return model.computePower(utilization) + } + + override fun toString(): String = "PStatePowerDriver.Logic" + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt new file mode 100644 index 00000000..a1a2b911 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.power + +import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.SimProcessingUnit + +/** + * A [PowerDriver] is responsible for switching the processor to the correct frequency. + */ +public interface PowerDriver { + /** + * Create the scaling logic for the specified [machine] + */ + public fun createLogic(machine: SimMachine, cpus: List): Logic + + /** + * The logic of the scaling driver. + */ + public interface Logic { + /** + * Compute the power consumption of the processor. + * + * @return The power consumption of the processor in W. + */ + public fun computePower(): Double + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt new file mode 100644 index 00000000..5c5ceff5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.power + +import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.SimProcessingUnit + +/** + * A [PowerDriver] that computes the power consumption based on a single specified [power model][model]. + */ +public class SimplePowerDriver(private val model: PowerModel) : PowerDriver { + override fun createLogic(machine: SimMachine, cpus: List): PowerDriver.Logic = object : PowerDriver.Logic { + override fun computePower(): Double { + return model.computePower(machine.usage.value) + } + + override fun toString(): String = "SimplePowerDriver.Logic" + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 1709cc23..0bcfd9c6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -32,12 +32,11 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter @@ -95,7 +94,7 @@ internal class SimHypervisorTest { ) val platform = SimResourceInterpreter(coroutineContext, clock) - val machine = SimBareMetalMachine(platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimFairShareHypervisor(platform, null, listener) launch { @@ -168,8 +167,7 @@ internal class SimHypervisorTest { val platform = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - platform, model, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimFairShareHypervisor(platform, null, listener) @@ -210,8 +208,7 @@ internal class SimHypervisorTest { val platform = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - platform, model, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimFairShareHypervisor(platform) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 7cc3c6dd..69f562d2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -29,12 +29,11 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter @@ -58,7 +57,11 @@ class SimMachineTest { @Test fun testFlopsWorkload() = runBlockingSimulation { - val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) @@ -77,7 +80,11 @@ class SimMachineTest { cpus = List(cpuNode.coreCount * 2) { ProcessingUnit(cpuNode, it % 2, 1000.0) }, memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) @@ -91,7 +98,11 @@ class SimMachineTest { @Test fun testUsage() = runBlockingSimulation { - val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) val res = mutableListOf() val job = launch { machine.usage.toList(res) } @@ -108,7 +119,11 @@ class SimMachineTest { @Test fun testClose() = runBlockingSimulation { - val machine = SimBareMetalMachine(SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) machine.close() assertDoesNotThrow { machine.close() } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index dd824557..dba3e9a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -30,12 +30,11 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -79,8 +78,7 @@ internal class SimSpaceSharedHypervisorTest { val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimSpaceSharedHypervisor(interpreter) @@ -116,8 +114,7 @@ internal class SimSpaceSharedHypervisorTest { val workload = SimRuntimeWorkload(duration) val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimSpaceSharedHypervisor(interpreter) @@ -140,8 +137,7 @@ internal class SimSpaceSharedHypervisorTest { val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimSpaceSharedHypervisor(interpreter) @@ -162,8 +158,7 @@ internal class SimSpaceSharedHypervisorTest { val duration = 5 * 60L * 1000 val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimSpaceSharedHypervisor(interpreter) @@ -189,8 +184,7 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimSpaceSharedHypervisor(interpreter) @@ -214,8 +208,7 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val hypervisor = SimSpaceSharedHypervisor(interpreter) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt deleted file mode 100644 index bbea3ee2..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.cpufreq - -import io.mockk.every -import io.mockk.mockk -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.SimProcessingUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.LinearPowerModel - -/** - * Test suite for [PStateScalingDriver]. - */ -internal class PStateScalingDriverTest { - @Test - fun testPowerWithoutGovernor() { - val machine = mockk() - - val driver = PStateScalingDriver( - sortedMapOf( - 2800.0 to ConstantPowerModel(200.0), - 3300.0 to ConstantPowerModel(300.0), - 3600.0 to ConstantPowerModel(350.0), - ) - ) - - val logic = driver.createLogic(machine) - assertEquals(200.0, logic.computePower()) - } - - @Test - fun testPowerWithSingleGovernor() { - val machine = mockk() - val cpu = mockk() - - every { cpu.model.frequency } returns 4100.0 - every { cpu.speed } returns 1200.0 - - val driver = PStateScalingDriver( - sortedMapOf( - 2800.0 to ConstantPowerModel(200.0), - 3300.0 to ConstantPowerModel(300.0), - 3600.0 to ConstantPowerModel(350.0), - ) - ) - - val logic = driver.createLogic(machine) - - val scalingContext = logic.createContext(cpu) - scalingContext.setTarget(3200.0) - - assertEquals(300.0, logic.computePower()) - } - - @Test - fun testPowerWithMultipleGovernors() { - val machine = mockk() - val cpu = mockk() - - every { cpu.model.frequency } returns 4100.0 - every { cpu.speed } returns 1200.0 - - val driver = PStateScalingDriver( - sortedMapOf( - 2800.0 to ConstantPowerModel(200.0), - 3300.0 to ConstantPowerModel(300.0), - 3600.0 to ConstantPowerModel(350.0), - ) - ) - - val logic = driver.createLogic(machine) - - val scalingContextA = logic.createContext(cpu) - scalingContextA.setTarget(1000.0) - - val scalingContextB = logic.createContext(cpu) - scalingContextB.setTarget(3400.0) - - assertEquals(350.0, logic.computePower()) - } - - @Test - fun testPowerBasedOnUtilization() { - val machine = mockk() - val cpu = mockk() - - every { cpu.model.frequency } returns 4200.0 - - val driver = PStateScalingDriver( - sortedMapOf( - 2800.0 to LinearPowerModel(200.0, 100.0), - 3300.0 to LinearPowerModel(250.0, 150.0), - 4000.0 to LinearPowerModel(300.0, 200.0), - ) - ) - - val logic = driver.createLogic(machine) - - val scalingContext = logic.createContext(cpu) - - every { cpu.speed } returns 1400.0 - scalingContext.setTarget(1400.0) - assertEquals(150.0, logic.computePower()) - - every { cpu.speed } returns 1400.0 - scalingContext.setTarget(4000.0) - assertEquals(235.0, logic.computePower()) - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt new file mode 100644 index 00000000..35fd7c4c --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.power + +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.SimProcessingUnit + +/** + * Test suite for [PStatePowerDriver]. + */ +internal class PStatePowerDriverTest { + @Test + fun testPowerBaseline() { + val machine = mockk() + + val driver = PStatePowerDriver( + sortedMapOf( + 2800.0 to ConstantPowerModel(200.0), + 3300.0 to ConstantPowerModel(300.0), + 3600.0 to ConstantPowerModel(350.0), + ) + ) + + val logic = driver.createLogic(machine, emptyList()) + assertEquals(200.0, logic.computePower()) + } + + @Test + fun testPowerWithSingleCpu() { + val machine = mockk() + val cpu = mockk() + + every { cpu.capacity } returns 3200.0 + every { cpu.speed } returns 1200.0 + + val driver = PStatePowerDriver( + sortedMapOf( + 2800.0 to ConstantPowerModel(200.0), + 3300.0 to ConstantPowerModel(300.0), + 3600.0 to ConstantPowerModel(350.0), + ) + ) + + val logic = driver.createLogic(machine, listOf(cpu)) + + assertEquals(300.0, logic.computePower()) + } + + @Test + fun testPowerWithMultipleCpus() { + val machine = mockk() + val cpus = listOf( + mockk(), + mockk() + ) + + every { cpus[0].capacity } returns 1000.0 + every { cpus[0].speed } returns 1200.0 + + every { cpus[1].capacity } returns 3500.0 + every { cpus[1].speed } returns 1200.0 + + val driver = PStatePowerDriver( + sortedMapOf( + 2800.0 to ConstantPowerModel(200.0), + 3300.0 to ConstantPowerModel(300.0), + 3600.0 to ConstantPowerModel(350.0), + ) + ) + + val logic = driver.createLogic(machine, cpus) + + assertEquals(350.0, logic.computePower()) + } + + @Test + fun testPowerBasedOnUtilization() { + val machine = mockk() + val cpu = mockk() + + every { cpu.model.frequency } returns 4200.0 + + val driver = PStatePowerDriver( + sortedMapOf( + 2800.0 to LinearPowerModel(200.0, 100.0), + 3300.0 to LinearPowerModel(250.0, 150.0), + 4000.0 to LinearPowerModel(300.0, 200.0), + ) + ) + + val logic = driver.createLogic(machine, listOf(cpu)) + + every { cpu.speed } returns 1400.0 + every { cpu.capacity } returns 1400.0 + assertEquals(150.0, logic.computePower()) + + every { cpu.speed } returns 1400.0 + every { cpu.capacity } returns 4000.0 + assertEquals(235.0, logic.computePower()) + } +} -- cgit v1.2.3 From cef12722f03a24a0e1e3b7502fb5e434d93f1664 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 3 Jun 2021 13:31:59 +0200 Subject: simulator: Migrate frequency scaling governors to OS layer This change moves the CPU frequency scaling governors from the bare-metal/firmware layer (SimBareMetalMachine) to the OS/Hypervisor layer (SimHypervisor) where it can make more informed decisions about the CPU frequency based on the load of the operating system or hypervisor. --- .../simulator/compute/SimAbstractHypervisor.kt | 54 ++++++++++++++++++---- .../simulator/compute/SimBareMetalMachine.kt | 11 ++++- .../simulator/compute/SimFairShareHypervisor.kt | 12 +++-- .../compute/SimFairShareHypervisorProvider.kt | 2 +- .../opendc/simulator/compute/SimProcessingUnit.kt | 5 ++ .../simulator/compute/SimSpaceSharedHypervisor.kt | 2 +- .../compute/cpufreq/DemandScalingGovernor.kt | 36 --------------- .../compute/cpufreq/PerformanceScalingGovernor.kt | 8 ++-- .../simulator/compute/cpufreq/ScalingContext.kt | 46 ------------------ .../simulator/compute/cpufreq/ScalingGovernor.kt | 10 ++-- .../opendc/simulator/compute/SimHypervisorTest.kt | 5 +- .../compute/cpufreq/DemandScalingGovernorTest.kt | 48 ------------------- .../cpufreq/PerformanceScalingGovernorTest.kt | 49 ++++++++++++++++++++ 13 files changed, 134 insertions(+), 154 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 68ecc49f..57c25b86 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.compute +import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* @@ -29,8 +30,14 @@ import org.opendc.simulator.resources.SimResourceSwitch /** * Abstract implementation of the [SimHypervisor] interface. + * + * @param interpreter The resource interpreter to use. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. */ -public abstract class SimAbstractHypervisor(private val interpreter: SimResourceInterpreter) : SimHypervisor { +public abstract class SimAbstractHypervisor( + private val interpreter: SimResourceInterpreter, + private val scalingGovernor: ScalingGovernor? +) : SimHypervisor { /** * The machine on which the hypervisor runs. */ @@ -48,6 +55,11 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource override val vms: Set get() = _vms + /** + * The scaling governors attached to the physical CPUs backing this hypervisor. + */ + private val governors = mutableListOf() + /** * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. */ @@ -58,6 +70,16 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource */ public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean + /** + * Trigger the governors to recompute the scaling limits. + */ + protected fun triggerGovernors(load: Double) { + for (governor in governors) { + governor.onLimit(load) + } + } + + /* SimHypervisor */ override fun canFit(model: SimMachineModel): Boolean { return canFit(model, switch) } @@ -72,6 +94,21 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource return vm } + /* SimWorkload */ + override fun onStart(ctx: SimMachineContext) { + context = ctx + switch = createSwitch(ctx) + + for (cpu in ctx.cpus) { + val governor = scalingGovernor?.createLogic(cpu) + if (governor != null) { + governors.add(governor) + governor.onStart() + } + switch.addInput(cpu) + } + } + /** * A virtual machine running on the hypervisor. * @@ -94,15 +131,6 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource } } - override fun onStart(ctx: SimMachineContext) { - context = ctx - switch = createSwitch(ctx) - - for (cpu in ctx.cpus) { - switch.addInput(cpu) - } - } - /** * A [SimProcessingUnit] of a virtual machine. */ @@ -110,6 +138,12 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource private val source: SimResourceProvider, override val model: ProcessingUnit ) : SimProcessingUnit, SimResourceProvider by source { + override var capacity: Double + get() = source.capacity + set(_) { + // Ignore capacity changes + } + override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index c453cdf3..5d5d1e5a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -69,9 +69,18 @@ public class SimBareMetalMachine( * A [SimProcessingUnit] of a bare-metal machine. */ private class Cpu( - private val source: SimResourceProvider, + private val source: SimResourceSource, override val model: ProcessingUnit ) : SimProcessingUnit, SimResourceProvider by source { + override var capacity: Double + get() = source.capacity + set(value) { + // Clamp the capacity of the CPU between [0.0, maxFreq] + if (value >= 0.0 && value <= model.frequency) { + source.capacity = value + } + } + override fun toString(): String = "SimBareMetalMachine.Cpu[model=$model]" } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 3ceb3291..e7776c81 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.compute +import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSwitch @@ -34,21 +35,23 @@ import org.opendc.simulator.resources.SimResourceSystem * * @param interpreter The interpreter to manage the machine's resources. * @param parent The parent simulation system. + * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. * @param listener The hypervisor listener to use. */ public class SimFairShareHypervisor( private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null, + scalingGovernor: ScalingGovernor? = null, private val listener: SimHypervisor.Listener? = null -) : SimAbstractHypervisor(interpreter) { +) : SimAbstractHypervisor(interpreter, scalingGovernor) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { - return SwitchSystem().switch + return SwitchSystem(ctx).switch } - private inner class SwitchSystem : SimResourceSystem { + private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem { val switch = SimResourceSwitchMaxMin(interpreter, this) override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent @@ -82,6 +85,9 @@ public class SimFairShareHypervisor( lastDemand = counters.demand lastActual = counters.actual lastOvercommit = counters.overcommit + + val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency } + triggerGovernors(load) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt index d3206196..94c905b2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt @@ -35,5 +35,5 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { interpreter: SimResourceInterpreter, parent: SimResourceSystem?, listener: SimHypervisor.Listener? - ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener) + ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener = listener) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt index 136a543a..93c9ddfa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt @@ -29,6 +29,11 @@ import org.opendc.simulator.resources.SimResourceProvider * A simulated processing unit. */ public interface SimProcessingUnit : SimResourceProvider { + /** + * The capacity of the processing unit, which can be adjusted by the workload if supported by the machine. + */ + public override var capacity: Double + /** * The model representing the static properties of the processing unit. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index afb47872..f6ae18f7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -29,7 +29,7 @@ import org.opendc.simulator.resources.SimResourceSwitchExclusive /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) { +public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter, null) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt deleted file mode 100644 index ddbe1ca0..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.cpufreq - -/** - * A CPUFreq [ScalingGovernor] that requests the frequency based on the utilization of the machine. - */ -public class DemandScalingGovernor : ScalingGovernor { - override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { - override fun onLimit() { - ctx.setTarget(ctx.cpu.speed) - } - - override fun toString(): String = "DemandScalingGovernor.Logic" - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt index 96f8775a..245877be 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt @@ -22,13 +22,15 @@ package org.opendc.simulator.compute.cpufreq +import org.opendc.simulator.compute.SimProcessingUnit + /** * A CPUFreq [ScalingGovernor] that causes the highest possible frequency to be requested from the resource. */ public class PerformanceScalingGovernor : ScalingGovernor { - override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { - override fun onLimit() { - ctx.setTarget(ctx.cpu.model.frequency) + override fun createLogic(cpu: SimProcessingUnit): ScalingGovernor.Logic = object : ScalingGovernor.Logic { + override fun onStart() { + cpu.capacity = cpu.model.frequency } override fun toString(): String = "PerformanceScalingGovernor.Logic" diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt deleted file mode 100644 index 18338079..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.cpufreq - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.SimProcessingUnit - -/** - * A [ScalingContext] is used to communicate frequency scaling changes between the [ScalingGovernor] and driver. - */ -public interface ScalingContext { - /** - * The machine the processing unit belongs to. - */ - public val machine: SimMachine - - /** - * The processing unit associated with this context. - */ - public val cpu: SimProcessingUnit - - /** - * Target the processor to run at the specified target [frequency][freq]. - */ - public fun setTarget(freq: Double) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt index c9aea580..b7e7ffc6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.cpufreq +import org.opendc.simulator.compute.SimProcessingUnit + /** * A [ScalingGovernor] in the CPUFreq subsystem of OpenDC is responsible for scaling the frequency of simulated CPUs * independent of the particular implementation of the CPU. @@ -33,9 +35,9 @@ package org.opendc.simulator.compute.cpufreq */ public interface ScalingGovernor { /** - * Create the scaling logic for the specified [context] + * Create the scaling logic for the specified [cpu] */ - public fun createLogic(ctx: ScalingContext): Logic + public fun createLogic(cpu: SimProcessingUnit): Logic /** * The logic of the scaling governor. @@ -48,7 +50,9 @@ public interface ScalingGovernor { /** * This method is invoked when the governor should re-decide the frequency limits. + * + * @param load The load of the system. */ - public fun onLimit() {} + public fun onLimit(load: Double) {} } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 0bcfd9c6..b15692ec 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit @@ -95,7 +96,7 @@ internal class SimHypervisorTest { val platform = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, null, listener) + val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener) launch { machine.run(hypervisor) @@ -169,7 +170,7 @@ internal class SimHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null, listener) + val hypervisor = SimFairShareHypervisor(platform, listener = listener) launch { machine.run(hypervisor) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt deleted file mode 100644 index c482d348..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.cpufreq - -import io.mockk.every -import io.mockk.mockk -import io.mockk.verify -import org.junit.jupiter.api.Test - -/** - * Test suite for the [DemandScalingGovernor] - */ -internal class DemandScalingGovernorTest { - @Test - fun testSetDemandLimit() { - val ctx = mockk(relaxUnitFun = true) - - every { ctx.cpu.speed } returns 2100.0 - - val logic = DemandScalingGovernor().createLogic(ctx) - - logic.onStart() - verify(exactly = 0) { ctx.setTarget(any()) } - - logic.onLimit() - verify(exactly = 1) { ctx.setTarget(2100.0) } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt new file mode 100644 index 00000000..8e8b09c8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.cpufreq + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimProcessingUnit + +/** + * Test suite for the [PerformanceScalingGovernor] + */ +internal class PerformanceScalingGovernorTest { + @Test + fun testSetStartLimit() { + val cpu = mockk(relaxUnitFun = true) + + every { cpu.model.frequency } returns 4100.0 + every { cpu.speed } returns 2100.0 + + val logic = PerformanceScalingGovernor().createLogic(cpu) + + logic.onStart() + logic.onLimit(1.0) + + verify(exactly = 1) { cpu.capacity = 4100.0 } + } +} -- cgit v1.2.3