diff options
65 files changed, 2313 insertions, 1697 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 68667a8c..d36717af 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -32,15 +32,16 @@ import org.opendc.compute.api.ServerState import org.opendc.compute.service.driver.* import org.opendc.simulator.compute.* import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.ScalingDriver import org.opendc.simulator.compute.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.failures.FailureDomain +import org.opendc.simulator.resources.SimResourceInterpreter import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -59,7 +60,7 @@ public class SimHost( meter: Meter, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver, + scalingDriver: PowerDriver, private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), ) : Host, FailureDomain, AutoCloseable { @@ -74,7 +75,7 @@ public class SimHost( hypervisor: SimHypervisorProvider, powerModel: PowerModel = ConstantPowerModel(0.0), mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimpleScalingDriver(powerModel), mapper) + ) : this(uid, name, model, meta, context, clock, meter, hypervisor, PerformanceScalingGovernor(), SimplePowerDriver(powerModel), mapper) /** * The [CoroutineScope] of the host bounded by the lifecycle of the host. @@ -94,19 +95,24 @@ public class SimHost( /** * Current total memory use of the images on this hypervisor. */ - private var availableMemory: Long = model.memory.map { it.size }.sum() + private var availableMemory: Long = model.memory.sumOf { it.size } + + /** + * The resource interpreter to schedule the resource interactions. + */ + private val interpreter = SimResourceInterpreter(context, clock) /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(context, clock, model, scalingGovernor, scalingDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model, scalingDriver) /** * The hypervisor to run multiple workloads. */ public val hypervisor: SimHypervisor = hypervisor.create( - scope.coroutineContext, clock, - object : SimHypervisor.Listener { + interpreter, + listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, requestedWork: Long, diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5594fd59..a6cff3ba 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -124,9 +124,18 @@ internal class SimHostTest { object : MetricExporter { override fun export(metrics: Collection<MetricData>): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } - requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() - grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() - overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() + val totalWork = metricsByName["cpu.work.total"] + if (totalWork != null) { + requestedWork += totalWork.doubleSummaryData.points.first().sum.toLong() + } + val grantedWorkCycle = metricsByName["cpu.work.granted"] + if (grantedWorkCycle != null) { + grantedWork += grantedWorkCycle.doubleSummaryData.points.first().sum.toLong() + } + val overcommittedWorkCycle = metricsByName["cpu.work.overcommit"] + if (overcommittedWorkCycle != null) { + overcommittedWork += overcommittedWorkCycle.doubleSummaryData.points.first().sum.toLong() + } return CompletableResultCode.ofSuccess() } @@ -160,8 +169,8 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(4197600, requestedWork, "Requested work does not match") }, - { assertEquals(2157600, grantedWork, "Granted work does not match") }, + { assertEquals(4147200, requestedWork, "Requested work does not match") }, + { assertEquals(2107200, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, { assertEquals(1500001, clock.millis()) } ) diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 7c7f0dad..0dade513 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -48,4 +48,6 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } + + testImplementation(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 2d5cc68c..4b21b4f7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -113,8 +113,8 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) @@ -150,8 +150,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(96344616902, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(96324879442, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index 7460a1e7..37e10580 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -173,23 +173,23 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { */ public enum class PowerModelType { CUBIC { - override val driver: ScalingDriver = SimpleScalingDriver(CubicPowerModel(206.0, 56.4)) + override val driver: PowerDriver = SimplePowerDriver(CubicPowerModel(206.0, 56.4)) }, LINEAR { - override val driver: ScalingDriver = SimpleScalingDriver(LinearPowerModel(206.0, 56.4)) + override val driver: PowerDriver = SimplePowerDriver(LinearPowerModel(206.0, 56.4)) }, SQRT { - override val driver: ScalingDriver = SimpleScalingDriver(SqrtPowerModel(206.0, 56.4)) + override val driver: PowerDriver = SimplePowerDriver(SqrtPowerModel(206.0, 56.4)) }, SQUARE { - override val driver: ScalingDriver = SimpleScalingDriver(SquarePowerModel(206.0, 56.4)) + override val driver: PowerDriver = SimplePowerDriver(SquarePowerModel(206.0, 56.4)) }, INTERPOLATION { - override val driver: ScalingDriver = SimpleScalingDriver( + override val driver: PowerDriver = SimplePowerDriver( InterpolationPowerModel( listOf(56.4, 100.0, 107.0, 117.0, 127.0, 138.0, 149.0, 162.0, 177.0, 191.0, 206.0) ) @@ -197,17 +197,17 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { }, MSE { - override val driver: ScalingDriver = SimpleScalingDriver(MsePowerModel(206.0, 56.4, 1.4)) + override val driver: PowerDriver = SimplePowerDriver(MsePowerModel(206.0, 56.4, 1.4)) }, ASYMPTOTIC { - override val driver: ScalingDriver = SimpleScalingDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false)) + override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, false)) }, ASYMPTOTIC_DVFS { - override val driver: ScalingDriver = SimpleScalingDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true)) + override val driver: PowerDriver = SimplePowerDriver(AsymptoticPowerModel(206.0, 56.4, 0.3, true)) }; - public abstract val driver: ScalingDriver + public abstract val driver: PowerDriver } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index f4c18ff1..001547ef 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -29,16 +29,12 @@ import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceCommand -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.resources.* import java.time.Clock import java.util.* import kotlin.coroutines.Continuation @@ -67,8 +63,8 @@ public class SimTFDevice( * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine( - scope.coroutineContext, clock, SimMachineModel(listOf(pu), listOf(memory)), - PerformanceScalingGovernor(), SimpleScalingDriver(powerModel) + SimResourceInterpreter(scope.coroutineContext, clock), SimMachineModel(listOf(pu), listOf(memory)), + SimplePowerDriver(powerModel) ) /** @@ -119,9 +115,11 @@ public class SimTFDevice( */ private var activeWork: Work? = null - override fun onStart(ctx: SimMachineContext) {} - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer = this + override fun onStart(ctx: SimMachineContext) { + for (cpu in ctx.cpus) { + cpu.startConsumer(this) + } + } override fun onNext(ctx: SimResourceContext): SimResourceCommand { val activeWork = activeWork diff --git a/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt b/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt index b5516b4d..32e5f75e 100644 --- a/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt +++ b/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt @@ -34,9 +34,9 @@ import org.opendc.serverless.simulator.workload.SimServerlessWorkloadMapper import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.resources.SimResourceInterpreter import java.time.Clock import java.util.ArrayDeque import kotlin.coroutines.Continuation @@ -72,7 +72,11 @@ public class SimFunctionDeployer( /** * The machine that will execute the workloads. */ - public val machine: SimMachine = SimBareMetalMachine(scope.coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + public val machine: SimMachine = SimBareMetalMachine( + SimResourceInterpreter(scope.coroutineContext, clock), + model, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) /** * The job associated with the lifecycle of the instance. diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 15714aca..fb753de2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -25,17 +25,15 @@ package org.opendc.simulator.compute import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceScheduler -import org.opendc.simulator.resources.SimResourceSchedulerTrampoline +import org.opendc.simulator.resources.SimResourceInterpreter import org.openjdk.jmh.annotations.* import java.util.concurrent.TimeUnit @@ -46,13 +44,13 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: SimResourceScheduler + private lateinit var interpreter: SimResourceInterpreter private lateinit var machineModel: SimMachineModel @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) + interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -85,8 +83,7 @@ class SimMachineBenchmarks { fun benchmarkBareMetal(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace)) } @@ -96,10 +93,9 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } @@ -118,10 +114,9 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(scheduler) + val hypervisor = SimFairShareHypervisor(interpreter) launch { machine.run(hypervisor) } @@ -140,10 +135,9 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(scheduler) + val hypervisor = SimFairShareHypervisor(interpreter) launch { machine.run(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 713376e7..57c25b86 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -22,21 +22,22 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.launch +import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.* -import java.time.Clock +import org.opendc.simulator.resources.SimResourceSwitch /** * Abstract implementation of the [SimHypervisor] interface. + * + * @param interpreter The resource interpreter to use. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. */ -public abstract class SimAbstractHypervisor : SimHypervisor { +public abstract class SimAbstractHypervisor( + private val interpreter: SimResourceInterpreter, + private val scalingGovernor: ScalingGovernor? +) : SimHypervisor { /** * The machine on which the hypervisor runs. */ @@ -55,6 +56,11 @@ public abstract class SimAbstractHypervisor : SimHypervisor { get() = _vms /** + * The scaling governors attached to the physical CPUs backing this hypervisor. + */ + private val governors = mutableListOf<ScalingGovernor.Logic>() + + /** * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. */ public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch @@ -64,6 +70,16 @@ public abstract class SimAbstractHypervisor : SimHypervisor { */ public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean + /** + * Trigger the governors to recompute the scaling limits. + */ + protected fun triggerGovernors(load: Double) { + for (governor in governors) { + governor.onLimit(load) + } + } + + /* SimHypervisor */ override fun canFit(model: SimMachineModel): Boolean { return canFit(model, switch) } @@ -78,6 +94,21 @@ public abstract class SimAbstractHypervisor : SimHypervisor { return vm } + /* SimWorkload */ + override fun onStart(ctx: SimMachineContext) { + context = ctx + switch = createSwitch(ctx) + + for (cpu in ctx.cpus) { + val governor = scalingGovernor?.createLogic(cpu) + if (governor != null) { + governors.add(governor) + governor.onStart() + } + switch.addInput(cpu) + } + } + /** * A virtual machine running on the hypervisor. * @@ -85,105 +116,34 @@ public abstract class SimAbstractHypervisor : SimHypervisor { * @property performanceInterferenceModel The performance interference model to utilize. */ private inner class VirtualMachine( - override val model: SimMachineModel, + model: SimMachineModel, val performanceInterferenceModel: PerformanceInterferenceModel? = null, - ) : SimMachine { - /** - * A [StateFlow] representing the CPU usage of the simulated machine. - */ - override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - + ) : SimAbstractMachine(interpreter, parent = null, model) { /** * The vCPUs of the machine. */ - private val cpus = model.cpus.map { ProcessingUnitImpl(it, switch) } - - /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - coroutineScope { - require(!isTerminated) { "Machine is terminated" } - - val ctx = object : SimMachineContext { - override val cpus: List<SimProcessingUnit> = this@VirtualMachine.cpus - - override val memory: List<MemoryUnit> - get() = model.memory + override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) } - override val clock: Clock - get() = this@SimAbstractHypervisor.context.clock - - override val meta: Map<String, Any> = meta - } - - workload.onStart(ctx) - - for (cpu in cpus) { - launch { - cpu.consume(workload.getConsumer(ctx, cpu.model)) - } - } - } - } - - /** - * Terminate this VM instance. - */ override fun close() { - if (!isTerminated) { - isTerminated = true + super.close() - cpus.forEach(SimProcessingUnit::close) - _vms.remove(this) - } + _vms.remove(this) } } - override fun onStart(ctx: SimMachineContext) { - context = ctx - switch = createSwitch(ctx) - } - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - val forwarder = SimResourceForwarder() - switch.addInput(forwarder) - return forwarder - } - /** - * The [SimProcessingUnit] of this machine. + * A [SimProcessingUnit] of a virtual machine. */ - public inner class ProcessingUnitImpl(override val model: ProcessingUnit, switch: SimResourceSwitch) : SimProcessingUnit { - /** - * The actual resource supporting the processing unit. - */ - private val source = switch.addOutput(model.frequency) - - override val speed: Double = 0.0 /* TODO Implement */ - - override val state: SimResourceState - get() = source.state - - override fun startConsumer(consumer: SimResourceConsumer) { - source.startConsumer(consumer) - } - - override fun interrupt() { - source.interrupt() - } - - override fun cancel() { - source.cancel() - } + private class VCpu( + private val source: SimResourceProvider, + override val model: ProcessingUnit + ) : SimProcessingUnit, SimResourceProvider by source { + override var capacity: Double + get() = source.capacity + set(_) { + // Ignore capacity changes + } - override fun close() { - source.close() - } + override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index e501033a..e12ac72b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -27,17 +27,27 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.consume -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import java.time.Clock -import kotlin.coroutines.CoroutineContext +import org.opendc.simulator.resources.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. + * + * @param interpreter The interpreter to manage the machine's resources. + * @param parent The parent simulation system. + * @param model The model of the machine. */ -public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine { +public abstract class SimAbstractMachine( + protected val interpreter: SimResourceInterpreter, + final override val parent: SimResourceSystem?, + final override val model: SimMachineModel +) : SimMachine, SimResourceSystem { + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ private val _usage = MutableStateFlow(0.0) - override val usage: StateFlow<Double> + public final override val usage: StateFlow<Double> get() = _usage /** @@ -48,67 +58,76 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine private var _speed = doubleArrayOf() /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - - /** - * The [CoroutineContext] to run in. - */ - protected abstract val context: CoroutineContext - - /** * The resources allocated for this machine. */ protected abstract val cpus: List<SimProcessingUnit> /** - * The execution context in which the workload runs. + * A flag to indicate that the machine is terminated. */ - private inner class Context(override val meta: Map<String, Any>) : SimMachineContext { - override val clock: Clock - get() = this@SimAbstractMachine.clock - - override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus + private var isTerminated = false - override val memory: List<MemoryUnit> = model.memory - } + /** + * The continuation to resume when the virtual machine workload has finished. + */ + private var cont: Continuation<Unit>? = null /** * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) { - require(!isTerminated) { "Machine is terminated" } - val ctx = Context(meta) - val totalCapacity = model.cpus.sumOf { it.frequency } + override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { + check(!isTerminated) { "Machine is terminated" } + check(cont == null) { "A machine cannot run concurrently" } - _speed = DoubleArray(model.cpus.size) { 0.0 } - var totalSpeed = 0.0 + val ctx = Context(meta) // Before the workload starts, initialize the initial power draw + _speed = DoubleArray(model.cpus.size) { 0.0 } updateUsage(0.0) - workload.onStart(ctx) + return suspendCancellableCoroutine { cont -> + this.cont = cont - for (cpu in cpus) { - val model = cpu.model - val consumer = workload.getConsumer(ctx, model) - val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed -> - val _speed = _speed - val _usage = _usage - - val oldSpeed = _speed[model.id] - _speed[model.id] = newSpeed - totalSpeed = totalSpeed - oldSpeed + newSpeed - - val newUsage = totalSpeed / totalCapacity - if (_usage.value != newUsage) { - updateUsage(totalSpeed / totalCapacity) + // Cancel all cpus on cancellation + cont.invokeOnCancellation { + this.cont = null + + interpreter.batch { + for (cpu in cpus) { + cpu.cancel() + } } } - launch { cpu.consume(adapter) } + interpreter.batch { workload.onStart(ctx) } + } + } + + override fun close() { + if (isTerminated) { + return + } + + isTerminated = true + cancel() + interpreter.batch { + for (cpu in cpus) { + cpu.close() + } + } + } + + /* SimResourceSystem */ + override fun onConverge(timestamp: Long) { + val totalCapacity = model.cpus.sumOf { it.frequency } + val cpus = cpus + var totalSpeed = 0.0 + for (cpu in cpus) { + _speed[cpu.model.id] = cpu.speed + totalSpeed += cpu.speed } + + updateUsage(totalSpeed / totalCapacity) } /** @@ -118,10 +137,34 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine _usage.value = usage } - override fun close() { - if (!isTerminated) { - isTerminated = true - cpus.forEach(SimProcessingUnit::close) + /** + * Cancel the workload that is currently running on the machine. + */ + private fun cancel() { + interpreter.batch { + for (cpu in cpus) { + cpu.cancel() + } + } + + val cont = cont + if (cont != null) { + this.cont = null + cont.resume(Unit) } } + + /** + * The execution context in which the workload runs. + */ + private inner class Context(override val meta: Map<String, Any>) : SimMachineContext { + override val interpreter: SimResourceInterpreter + get() = this@SimAbstractMachine.interpreter + + override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus + + override val memory: List<MemoryUnit> = model.memory + + override fun close() = cancel() + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 27ebba21..5d5d1e5a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -22,62 +22,36 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* -import org.opendc.simulator.compute.cpufreq.ScalingDriver -import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.resources.* -import org.opendc.utils.TimerScheduler -import java.time.Clock -import kotlin.coroutines.* +import org.opendc.simulator.resources.SimResourceInterpreter /** * A simulated bare-metal machine that is able to run a single workload. * * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For - * example. the class expects only a single concurrent call to [run]. + * example. The class expects only a single concurrent call to [run]. * - * @param context The [CoroutineContext] to run the simulated workload in. - * @param clock The virtual clock to track the simulation time. + * @param interpreter The [SimResourceInterpreter] to drive the simulation. * @param model The machine model to simulate. + * @param powerDriver The power driver to use. + * @param parent The parent simulation system. */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( - context: CoroutineContext, - private val clock: Clock, - override val model: SimMachineModel, - scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver -) : SimAbstractMachine(clock) { - /** - * The [Job] associated with this machine. - */ - private val scope = CoroutineScope(context + Job()) - - override val context: CoroutineContext = scope.coroutineContext - - /** - * The [TimerScheduler] to use for scheduling the interrupts. - */ - private val scheduler = SimResourceSchedulerTrampoline(this.context, clock) - - override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) } + interpreter: SimResourceInterpreter, + model: SimMachineModel, + powerDriver: PowerDriver, + parent: SimResourceSystem? = null, +) : SimAbstractMachine(interpreter, parent, model) { + override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu -> + Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu) + } /** - * Construct the [ScalingDriver.Logic] for this machine. + * Construct the [PowerDriver.Logic] for this machine. */ - private val scalingDriver = scalingDriver.createLogic(this) - - /** - * The scaling contexts associated with each CPU. - */ - private val scalingGovernors = cpus.map { cpu -> - scalingGovernor.createLogic(this.scalingDriver.createContext(cpu)) - } - - init { - scalingGovernors.forEach { it.onStart() } - } + private val powerDriver = powerDriver.createLogic(this, cpus) /** * The power draw of the machine. @@ -88,45 +62,25 @@ public class SimBareMetalMachine( override fun updateUsage(usage: Double) { super.updateUsage(usage) - scalingGovernors.forEach { it.onLimit() } - powerDraw = scalingDriver.computePower() - } - - override fun close() { - super.close() - - scope.cancel() + powerDraw = powerDriver.computePower() } /** - * The [SimProcessingUnit] of this machine. + * A [SimProcessingUnit] of a bare-metal machine. */ - public inner class ProcessingUnitImpl(override val model: ProcessingUnit) : SimProcessingUnit { - /** - * The actual resource supporting the processing unit. - */ - private val source = SimResourceSource(model.frequency, scheduler) - - override val speed: Double - get() = source.speed - - override val state: SimResourceState - get() = source.state - - override fun startConsumer(consumer: SimResourceConsumer) { - source.startConsumer(consumer) - } - - override fun interrupt() { - source.interrupt() - } - - override fun cancel() { - source.cancel() - } - - override fun close() { - source.interrupt() - } + private class Cpu( + private val source: SimResourceSource, + override val model: ProcessingUnit + ) : SimProcessingUnit, SimResourceProvider by source { + override var capacity: Double + get() = source.capacity + set(value) { + // Clamp the capacity of the CPU between [0.0, maxFreq] + if (value >= 0.0 && value <= model.frequency) { + source.capacity = value + } + } + + override fun toString(): String = "SimBareMetalMachine.Cpu[model=$model]" } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 11aec2de..e7776c81 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -22,35 +22,72 @@ package org.opendc.simulator.compute +import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.* +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSwitch +import org.opendc.simulator.resources.SimResourceSwitchMaxMin +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single * [SimBareMetalMachine] concurrently using weighted fair sharing. * + * @param interpreter The interpreter to manage the machine's resources. + * @param parent The parent simulation system. + * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { +public class SimFairShareHypervisor( + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null, + scalingGovernor: ScalingGovernor? = null, + private val listener: SimHypervisor.Listener? = null +) : SimAbstractHypervisor(interpreter, scalingGovernor) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { - return SimResourceSwitchMaxMin( - scheduler, - object : SimResourceSwitchMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } + return SwitchSystem(ctx).switch + } + + private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem { + val switch = SimResourceSwitchMaxMin(interpreter, this) + + override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent + + private var lastCpuUsage = 0.0 + private var lastCpuDemand = 0.0 + private var lastDemand = 0.0 + private var lastActual = 0.0 + private var lastOvercommit = 0.0 + private var lastReport = Long.MIN_VALUE + + override fun onConverge(timestamp: Long) { + val listener = listener ?: return + val counters = switch.counters + + if (timestamp > lastReport) { + listener.onSliceFinish( + this@SimFairShareHypervisor, + (counters.demand - lastDemand).toLong(), + (counters.actual - lastActual).toLong(), + (counters.overcommit - lastOvercommit).toLong(), + 0L, + lastCpuUsage, + lastCpuDemand + ) } - ) + lastReport = timestamp + + lastCpuDemand = switch.inputs.sumOf { it.demand } + lastCpuUsage = switch.inputs.sumOf { it.speed } + lastDemand = counters.demand + lastActual = counters.actual + lastOvercommit = counters.overcommit + + val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency } + triggerGovernors(load) + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt index 2ab3ea09..94c905b2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt @@ -22,9 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceSchedulerTrampoline -import java.time.Clock -import kotlin.coroutines.CoroutineContext +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -32,7 +31,9 @@ import kotlin.coroutines.CoroutineContext public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" - override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { - return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener) - } + override fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem?, + listener: SimHypervisor.Listener? + ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener = listener) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt index b66020f4..8e8c3698 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import java.time.Clock -import kotlin.coroutines.CoroutineContext +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A service provider interface for constructing a [SimHypervisor]. @@ -40,5 +40,9 @@ public interface SimHypervisorProvider { /** * Create a [SimHypervisor] instance with the specified [listener]. */ - public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor + public fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null, + listener: SimHypervisor.Listener? = null + ): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index c2523a2a..5cbabc86 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -23,18 +23,18 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.MemoryUnit -import java.time.Clock +import org.opendc.simulator.resources.SimResourceInterpreter /** * A simulated execution context in which a bootable image runs. This interface represents the * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on * which the image runs. */ -public interface SimMachineContext { +public interface SimMachineContext : AutoCloseable { /** - * The virtual clock tracking simulation time. + * The resource interpreter that simulates the machine. */ - public val clock: Clock + public val interpreter: SimResourceInterpreter /** * The metadata associated with the context. @@ -50,4 +50,9 @@ public interface SimMachineContext { * The memory available on the machine */ public val memory: List<MemoryUnit> + + /** + * Stop the workload. + */ + public override fun close() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt index 13c7d9b2..93c9ddfa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt @@ -30,12 +30,12 @@ import org.opendc.simulator.resources.SimResourceProvider */ public interface SimProcessingUnit : SimResourceProvider { /** - * The model representing the static properties of the processing unit. + * The capacity of the processing unit, which can be adjusted by the workload if supported by the machine. */ - public val model: ProcessingUnit + public override var capacity: Double /** - * The current speed of the processing unit. + * The model representing the static properties of the processing unit. */ - public val speed: Double + public val model: ProcessingUnit } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt index fd8e546f..f6ae18f7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -22,12 +22,14 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.* +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSwitch +import org.opendc.simulator.resources.SimResourceSwitchExclusive /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor : SimAbstractHypervisor() { +public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter, null) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean { return switch.inputs.size - switch.outputs.size >= model.cpus.size } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index 83b924d7..923b5bab 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import java.time.Clock -import kotlin.coroutines.CoroutineContext +import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -31,7 +31,9 @@ import kotlin.coroutines.CoroutineContext public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { - return SimSpaceSharedHypervisor() - } + override fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem?, + listener: SimHypervisor.Listener? + ): SimHypervisor = SimSpaceSharedHypervisor(interpreter) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt index 96f8775a..245877be 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt @@ -22,13 +22,15 @@ package org.opendc.simulator.compute.cpufreq +import org.opendc.simulator.compute.SimProcessingUnit + /** * A CPUFreq [ScalingGovernor] that causes the highest possible frequency to be requested from the resource. */ public class PerformanceScalingGovernor : ScalingGovernor { - override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { - override fun onLimit() { - ctx.setTarget(ctx.cpu.model.frequency) + override fun createLogic(cpu: SimProcessingUnit): ScalingGovernor.Logic = object : ScalingGovernor.Logic { + override fun onStart() { + cpu.capacity = cpu.model.frequency } override fun toString(): String = "PerformanceScalingGovernor.Logic" diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt index c9aea580..b7e7ffc6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.cpufreq +import org.opendc.simulator.compute.SimProcessingUnit + /** * A [ScalingGovernor] in the CPUFreq subsystem of OpenDC is responsible for scaling the frequency of simulated CPUs * independent of the particular implementation of the CPU. @@ -33,9 +35,9 @@ package org.opendc.simulator.compute.cpufreq */ public interface ScalingGovernor { /** - * Create the scaling logic for the specified [context] + * Create the scaling logic for the specified [cpu] */ - public fun createLogic(ctx: ScalingContext): Logic + public fun createLogic(cpu: SimProcessingUnit): Logic /** * The logic of the scaling governor. @@ -48,7 +50,9 @@ public interface ScalingGovernor { /** * This method is invoked when the governor should re-decide the frequency limits. + * + * @param load The load of the system. */ - public fun onLimit() {} + public fun onLimit(load: Double) {} } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt index 6f44d778..6328c8e4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt @@ -20,67 +20,41 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.cpufreq +package org.opendc.simulator.compute.power import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimProcessingUnit -import org.opendc.simulator.compute.power.PowerModel import java.util.* import kotlin.math.max import kotlin.math.min /** - * A [ScalingDriver] that scales the frequency of the processor based on a discrete set of frequencies. + * A [PowerDriver] that computes the power draw using multiple [PowerModel]s based on multiple frequency states. * * @param states A map describing the states of the driver. */ -public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDriver { +public class PStatePowerDriver(states: Map<Double, PowerModel>) : PowerDriver { /** * The P-States defined by the user and ordered by key. */ private val states = TreeMap(states) - override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic { - /** - * The scaling contexts. - */ - private val contexts = mutableListOf<ScalingContextImpl>() - - override fun createContext(cpu: SimProcessingUnit): ScalingContext { - val ctx = ScalingContextImpl(machine, cpu) - contexts.add(ctx) - return ctx - } - + override fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): PowerDriver.Logic = object : PowerDriver.Logic { override fun computePower(): Double { var targetFreq = 0.0 var totalSpeed = 0.0 - for (ctx in contexts) { - targetFreq = max(ctx.target, targetFreq) - totalSpeed += ctx.cpu.speed + for (cpu in cpus) { + targetFreq = max(cpu.capacity, targetFreq) + totalSpeed += cpu.speed } val maxFreq = states.lastKey() val (actualFreq, model) = states.ceilingEntry(min(maxFreq, targetFreq)) - val utilization = totalSpeed / (actualFreq * contexts.size) + val utilization = totalSpeed / (actualFreq * cpus.size) return model.computePower(utilization) } - override fun toString(): String = "PStateScalingDriver.Logic" - } - - private class ScalingContextImpl( - override val machine: SimMachine, - override val cpu: SimProcessingUnit - ) : ScalingContext { - var target = cpu.model.frequency - private set - - override fun setTarget(freq: Double) { - target = freq - } - - override fun toString(): String = "PStateScalingDriver.Context" + override fun toString(): String = "PStatePowerDriver.Logic" } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt index b4fd7550..a1a2b911 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt @@ -20,30 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.cpufreq +package org.opendc.simulator.compute.power import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimProcessingUnit /** - * A [ScalingDriver] is responsible for switching the processor to the correct frequency. + * A [PowerDriver] is responsible for switching the processor to the correct frequency. */ -public interface ScalingDriver { +public interface PowerDriver { /** * Create the scaling logic for the specified [machine] */ - public fun createLogic(machine: SimMachine): Logic + public fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): Logic /** * The logic of the scaling driver. */ public interface Logic { /** - * Create the [ScalingContext] for the specified [cpu] instance. - */ - public fun createContext(cpu: SimProcessingUnit): ScalingContext - - /** * Compute the power consumption of the processor. * * @return The power consumption of the processor in W. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt index 18338079..5c5ceff5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt @@ -20,27 +20,20 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.cpufreq +package org.opendc.simulator.compute.power import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimProcessingUnit /** - * A [ScalingContext] is used to communicate frequency scaling changes between the [ScalingGovernor] and driver. + * A [PowerDriver] that computes the power consumption based on a single specified [power model][model]. */ -public interface ScalingContext { - /** - * The machine the processing unit belongs to. - */ - public val machine: SimMachine +public class SimplePowerDriver(private val model: PowerModel) : PowerDriver { + override fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): PowerDriver.Logic = object : PowerDriver.Logic { + override fun computePower(): Double { + return model.computePower(machine.usage.value) + } - /** - * The processing unit associated with this context. - */ - public val cpu: SimProcessingUnit - - /** - * Target the processor to run at the specified target [frequency][freq]. - */ - public fun setTarget(freq: Double) + override fun toString(): String = "SimplePowerDriver.Logic" + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt new file mode 100644 index 00000000..43662d93 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.util + +import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.SimResourceEvent + +/** + * A helper class to manage the lifecycle of a [SimWorkload] + */ +public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { + /** + * The resource consumers which represent the lifecycle of the workload. + */ + private val waiting = mutableSetOf<SimResourceConsumer>() + + /** + * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. + */ + public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer { + waiting.add(consumer) + return object : SimResourceConsumer by consumer { + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + try { + consumer.onEvent(ctx, event) + } finally { + if (event == SimResourceEvent.Exit) { + complete(consumer) + } + } + } + + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + try { + consumer.onFailure(ctx, cause) + } finally { + complete(consumer) + } + } + + override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" + } + } + + /** + * Complete the specified [SimResourceConsumer]. + */ + private fun complete(consumer: SimResourceConsumer) { + if (waiting.remove(consumer) && waiting.isEmpty()) { + ctx.close() + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 63c9d28c..de6832ca 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,8 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.compute.util.SimWorkloadLifecycle import org.opendc.simulator.resources.consumer.SimWorkConsumer /** @@ -43,10 +42,11 @@ public class SimFlopsWorkload( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimMachineContext) {} - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization) + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + for (cpu in ctx.cpus) { + cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization))) + } } override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index a3420e32..318a6b49 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -23,8 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.compute.util.SimWorkloadLifecycle import org.opendc.simulator.resources.consumer.SimWorkConsumer /** @@ -42,11 +41,12 @@ public class SimRuntimeWorkload( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - override fun onStart(ctx: SimMachineContext) {} - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - val limit = cpu.frequency * utilization - return SimWorkConsumer((limit / 1000) * duration, utilization) + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + for (cpu in ctx.cpus) { + val limit = cpu.capacity * utilization + cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization))) + } } override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index ffb332d1..6929f4d2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.util.SimWorkloadLifecycle import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -44,33 +45,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa barrier = SimConsumerBarrier(ctx.cpus.size) fragment = nextFragment() - offset = ctx.clock.millis() - } - - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer { - return object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val now = ctx.clock.millis() - val fragment = fragment ?: return SimResourceCommand.Exit - val usage = fragment.usage / fragment.cores - val work = (fragment.duration / 1000) * usage - val deadline = offset + fragment.duration - - assert(deadline >= now) { "Deadline already passed" } - - val cmd = - if (cpu.id < fragment.cores && work > 0.0) - SimResourceCommand.Consume(work, usage, deadline) - else - SimResourceCommand.Idle(deadline) + offset = ctx.interpreter.clock.millis() - if (barrier.enter()) { - this@SimTraceWorkload.fragment = nextFragment() - this@SimTraceWorkload.offset += fragment.duration - } + val lifecycle = SimWorkloadLifecycle(ctx) - return cmd - } + for (cpu in ctx.cpus) { + cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model))) } } @@ -87,6 +67,31 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa } } + private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + val now = ctx.clock.millis() + val fragment = fragment ?: return SimResourceCommand.Exit + val usage = fragment.usage / fragment.cores + val work = (fragment.duration / 1000) * usage + val deadline = offset + fragment.duration + + assert(deadline >= now) { "Deadline already passed" } + + val cmd = + if (cpu.id < fragment.cores && work > 0.0) + SimResourceCommand.Consume(work, usage, deadline) + else + SimResourceCommand.Idle(deadline) + + if (barrier.enter()) { + this@SimTraceWorkload.fragment = nextFragment() + this@SimTraceWorkload.offset += fragment.duration + } + + return cmd + } + } + /** * A fragment of the workload. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index bdc12bb5..b80665fa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -23,8 +23,6 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer /** * A model that characterizes the runtime behavior of some particular workload. @@ -35,11 +33,8 @@ import org.opendc.simulator.resources.SimResourceConsumer public interface SimWorkload { /** * This method is invoked when the workload is started. + * + * @param ctx The execution context in which the machine runs. */ public fun onStart(ctx: SimMachineContext) - - /** - * Obtain the resource consumer for the specified processing unit. - */ - public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 8886caa7..b15692ec 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -31,15 +31,16 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceSchedulerTrampoline +import org.opendc.simulator.resources.SimResourceInterpreter /** * Test suite for the [SimHypervisor] class. @@ -93,8 +94,9 @@ internal class SimHypervisorTest { ), ) - val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) + val platform = SimResourceInterpreter(coroutineContext, clock) + val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener) launch { machine.run(hypervisor) @@ -164,11 +166,11 @@ internal class SimHypervisorTest { ) ) + val platform = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, model, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) + val hypervisor = SimFairShareHypervisor(platform, listener = listener) launch { machine.run(hypervisor) @@ -190,10 +192,33 @@ internal class SimHypervisorTest { yield() assertAll( - { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") }, { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, { assertEquals(1200000, clock.millis()) } ) } + + @Test + fun testMultipleCPUs() = runBlockingSimulation { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val model = SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + + val platform = SimResourceInterpreter(coroutineContext, clock) + val machine = SimBareMetalMachine( + platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) + ) + val hypervisor = SimFairShareHypervisor(platform) + + assertDoesNotThrow { + launch { + machine.run(hypervisor) + } + } + + machine.close() + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 205f2eca..69f562d2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -29,14 +29,14 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter /** * Test suite for the [SimBareMetalMachine] class. @@ -57,7 +57,11 @@ class SimMachineTest { @Test fun testFlopsWorkload() = runBlockingSimulation { - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) @@ -76,7 +80,11 @@ class SimMachineTest { cpus = List(cpuNode.coreCount * 2) { ProcessingUnit(cpuNode, it % 2, 1000.0) }, memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) try { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) @@ -90,7 +98,11 @@ class SimMachineTest { @Test fun testUsage() = runBlockingSimulation { - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) val res = mutableListOf<Double>() val job = launch { machine.usage.toList(res) } @@ -99,7 +111,7 @@ class SimMachineTest { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) yield() job.cancel() - assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" } + assertEquals(listOf(0.0, 1.0, 0.0), res) { "Machine is fully utilized" } } finally { machine.close() } @@ -107,7 +119,11 @@ class SimMachineTest { @Test fun testClose() = runBlockingSimulation { - val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) + val machine = SimBareMetalMachine( + SimResourceInterpreter(coroutineContext, clock), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)) + ) machine.close() assertDoesNotThrow { machine.close() } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt index ef6f536d..dba3e9a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -30,16 +30,16 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceInterpreter /** * A test suite for the [SimSpaceSharedHypervisor]. @@ -76,11 +76,11 @@ internal class SimSpaceSharedHypervisorTest { ), ) + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) val colA = launch { machine.usage.toList(usagePm) } launch { machine.run(hypervisor) } @@ -112,11 +112,11 @@ internal class SimSpaceSharedHypervisorTest { fun testRuntimeWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -135,11 +135,11 @@ internal class SimSpaceSharedHypervisorTest { fun testFlopsWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -156,11 +156,11 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -182,11 +182,11 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() @@ -206,11 +206,11 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { + val interpreter = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine( - coroutineContext, clock, machineModel, PerformanceScalingGovernor(), - SimpleScalingDriver(ConstantPowerModel(0.0)) + interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor() + val hypervisor = SimSpaceSharedHypervisor(interpreter) launch { machine.run(hypervisor) } yield() diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt index c482d348..8e8b09c8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt @@ -26,23 +26,24 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimProcessingUnit /** - * Test suite for the [DemandScalingGovernor] + * Test suite for the [PerformanceScalingGovernor] */ -internal class DemandScalingGovernorTest { +internal class PerformanceScalingGovernorTest { @Test - fun testSetDemandLimit() { - val ctx = mockk<ScalingContext>(relaxUnitFun = true) + fun testSetStartLimit() { + val cpu = mockk<SimProcessingUnit>(relaxUnitFun = true) - every { ctx.cpu.speed } returns 2100.0 + every { cpu.model.frequency } returns 4100.0 + every { cpu.speed } returns 2100.0 - val logic = DemandScalingGovernor().createLogic(ctx) + val logic = PerformanceScalingGovernor().createLogic(cpu) logic.onStart() - verify(exactly = 0) { ctx.setTarget(any()) } + logic.onLimit(1.0) - logic.onLimit() - verify(exactly = 1) { ctx.setTarget(2100.0) } + verify(exactly = 1) { cpu.capacity = 4100.0 } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt index bbea3ee2..35fd7c4c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.cpufreq +package org.opendc.simulator.compute.power import io.mockk.every import io.mockk.mockk @@ -28,18 +28,16 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimProcessingUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.LinearPowerModel /** - * Test suite for [PStateScalingDriver]. + * Test suite for [PStatePowerDriver]. */ -internal class PStateScalingDriverTest { +internal class PStatePowerDriverTest { @Test - fun testPowerWithoutGovernor() { + fun testPowerBaseline() { val machine = mockk<SimBareMetalMachine>() - val driver = PStateScalingDriver( + val driver = PStatePowerDriver( sortedMapOf( 2800.0 to ConstantPowerModel(200.0), 3300.0 to ConstantPowerModel(300.0), @@ -47,19 +45,19 @@ internal class PStateScalingDriverTest { ) ) - val logic = driver.createLogic(machine) + val logic = driver.createLogic(machine, emptyList()) assertEquals(200.0, logic.computePower()) } @Test - fun testPowerWithSingleGovernor() { + fun testPowerWithSingleCpu() { val machine = mockk<SimBareMetalMachine>() val cpu = mockk<SimProcessingUnit>() - every { cpu.model.frequency } returns 4100.0 + every { cpu.capacity } returns 3200.0 every { cpu.speed } returns 1200.0 - val driver = PStateScalingDriver( + val driver = PStatePowerDriver( sortedMapOf( 2800.0 to ConstantPowerModel(200.0), 3300.0 to ConstantPowerModel(300.0), @@ -67,23 +65,26 @@ internal class PStateScalingDriverTest { ) ) - val logic = driver.createLogic(machine) - - val scalingContext = logic.createContext(cpu) - scalingContext.setTarget(3200.0) + val logic = driver.createLogic(machine, listOf(cpu)) assertEquals(300.0, logic.computePower()) } @Test - fun testPowerWithMultipleGovernors() { + fun testPowerWithMultipleCpus() { val machine = mockk<SimBareMetalMachine>() - val cpu = mockk<SimProcessingUnit>() + val cpus = listOf( + mockk<SimProcessingUnit>(), + mockk() + ) - every { cpu.model.frequency } returns 4100.0 - every { cpu.speed } returns 1200.0 + every { cpus[0].capacity } returns 1000.0 + every { cpus[0].speed } returns 1200.0 - val driver = PStateScalingDriver( + every { cpus[1].capacity } returns 3500.0 + every { cpus[1].speed } returns 1200.0 + + val driver = PStatePowerDriver( sortedMapOf( 2800.0 to ConstantPowerModel(200.0), 3300.0 to ConstantPowerModel(300.0), @@ -91,13 +92,7 @@ internal class PStateScalingDriverTest { ) ) - val logic = driver.createLogic(machine) - - val scalingContextA = logic.createContext(cpu) - scalingContextA.setTarget(1000.0) - - val scalingContextB = logic.createContext(cpu) - scalingContextB.setTarget(3400.0) + val logic = driver.createLogic(machine, cpus) assertEquals(350.0, logic.computePower()) } @@ -109,7 +104,7 @@ internal class PStateScalingDriverTest { every { cpu.model.frequency } returns 4200.0 - val driver = PStateScalingDriver( + val driver = PStatePowerDriver( sortedMapOf( 2800.0 to LinearPowerModel(200.0, 100.0), 3300.0 to LinearPowerModel(250.0, 150.0), @@ -117,16 +112,14 @@ internal class PStateScalingDriverTest { ) ) - val logic = driver.createLogic(machine) - - val scalingContext = logic.createContext(cpu) + val logic = driver.createLogic(machine, listOf(cpu)) every { cpu.speed } returns 1400.0 - scalingContext.setTarget(1400.0) + every { cpu.capacity } returns 1400.0 assertEquals(150.0, logic.computePower()) every { cpu.speed } returns 1400.0 - scalingContext.setTarget(4000.0) + every { cpu.capacity } returns 4000.0 assertEquals(235.0, logic.computePower()) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index cd5f33bd..b45b2a2f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimResourceBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: SimResourceScheduler + private lateinit var interpreter: SimResourceInterpreter @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) + interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) } @State(Scope.Thread) @@ -67,7 +67,7 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSource(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, scheduler) + val provider = SimResourceSource(4200.0, interpreter) return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -75,7 +75,7 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, scheduler) + val provider = SimResourceSource(4200.0, interpreter) val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace)) @@ -85,12 +85,12 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(scheduler) + val switch = SimResourceSwitchMaxMin(interpreter) - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -98,14 +98,14 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(scheduler) + val switch = SimResourceSwitchMaxMin(interpreter) - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) - repeat(3) { i -> + repeat(3) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } @@ -117,10 +117,10 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -130,12 +130,12 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) repeat(2) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 653b53e0..5fe7d7bb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -25,7 +25,10 @@ package org.opendc.simulator.resources /** * Abstract implementation of [SimResourceAggregator]. */ -public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator { +public abstract class SimAbstractResourceAggregator( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? +) : SimResourceAggregator { /** * This method is invoked when the resource consumer consumes resources. */ @@ -39,7 +42,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe /** * This method is invoked when the resource consumer finishes processing. */ - protected abstract fun doFinish(cause: Throwable?) + protected abstract fun doFinish() /** * This method is invoked when an input context is started. @@ -51,8 +54,9 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe */ protected abstract fun onInputFinished(input: Input) + /* SimResourceAggregator */ override fun addInput(input: SimResourceProvider) { - check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + check(state != SimResourceState.Stopped) { "Aggregator has been stopped" } val consumer = Consumer() _inputs.add(input) @@ -60,42 +64,75 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe input.startConsumer(consumer) } - override fun close() { - output.close() - } - - override val output: SimResourceProvider - get() = _output - private val _output = SimResourceForwarder() - override val inputs: Set<SimResourceProvider> get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() private val _inputConsumers = mutableListOf<Consumer>() - protected val outputContext: SimResourceContext - get() = context - private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) { - override val remainingWork: Double - get() { - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - _inputConsumers.sumOf { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it } - } else { - _remainingWork + /* SimResourceProvider */ + override val state: SimResourceState + get() = _output.state + + override val capacity: Double + get() = _output.capacity + + override val speed: Double + get() = _output.speed + + override val demand: Double + get() = _output.demand + + override val counters: SimResourceCounters + get() = _output.counters + + override fun startConsumer(consumer: SimResourceConsumer) { + _output.startConsumer(consumer) + } + + override fun cancel() { + _output.cancel() + } + + override fun interrupt() { + _output.interrupt() + } + + override fun close() { + _output.close() + } + + private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { + override fun createLogic(): SimResourceProviderLogic { + return object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + doIdle(deadline) + return Long.MAX_VALUE } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + doConsume(work, limit, deadline) + return Long.MAX_VALUE + } - override fun onIdle(deadline: Long) = doIdle(deadline) + override fun onFinish(ctx: SimResourceControllableContext) { + doFinish() + } - override fun onFinish() { - doFinish(null) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return _inputConsumers.sumOf { it.remainingWork } + } + } + } + + /** + * Flush the progress of the output if possible. + */ + fun flush() { + ctx?.flush() } } @@ -123,7 +160,13 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe */ override val ctx: SimResourceContext get() = _ctx!! - var _ctx: SimResourceContext? = null + private var _ctx: SimResourceContext? = null + + /** + * The remaining work of the consumer. + */ + val remainingWork: Double + get() = _ctx?.remainingWork ?: 0.0 /** * The resource command to run next. @@ -132,7 +175,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe private fun updateCapacity() { // Adjust capacity of output resource - context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } + _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } } /* Input */ @@ -149,7 +192,8 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe this.command = null next } else { - context.flush(isIntermediate = true) + _output.flush() + next = command this.command = null next ?: SimResourceCommand.Idle() @@ -162,11 +206,6 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe _ctx = ctx updateCapacity() - // Make sure we initialize the output if we have not done so yet - if (context.state == SimResourceState.Pending) { - context.start() - } - onInputStarted(this) } SimResourceEvent.Capacity -> updateCapacity() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt deleted file mode 100644 index c03bfad5..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock -import kotlin.math.max -import kotlin.math.min - -/** - * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. - */ -public abstract class SimAbstractResourceContext( - initialCapacity: Double, - private val scheduler: SimResourceScheduler, - private val consumer: SimResourceConsumer -) : SimResourceContext, SimResourceFlushable { - - /** - * The clock of the context. - */ - public override val clock: Clock - get() = scheduler.clock - - /** - * The capacity of the resource. - */ - public final override var capacity: Double = initialCapacity - set(value) { - val oldValue = field - - // Only changes will be propagated - if (value != oldValue) { - field = value - onCapacityChange() - } - } - - /** - * The amount of work still remaining at this instant. - */ - override val remainingWork: Double - get() { - val activeCommand = activeCommand ?: return 0.0 - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(activeCommand, now).also { _remainingWork = it } - } else { - _remainingWork - } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - - /** - * A flag to indicate the state of the context. - */ - public var state: SimResourceState = SimResourceState.Pending - private set - - /** - * The current processing speed of the resource. - */ - final override var speed: Double = 0.0 - private set - - /** - * This method is invoked when the resource will idle until the specified [deadline]. - */ - public abstract fun onIdle(deadline: Long) - - /** - * This method is invoked when the resource will be consumed until the specified [work] was processed or the - * [deadline] was reached. - */ - public abstract fun onConsume(work: Double, limit: Double, deadline: Long) - - /** - * This method is invoked when the resource consumer has finished. - */ - public abstract fun onFinish() - - /** - * Get the remaining work to process after a resource consumption. - * - * @param work The size of the resource consumption. - * @param speed The speed of consumption. - * @param duration The duration from the start of the consumption until now. - * @return The amount of work remaining. - */ - protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - return if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, work - processed) - } else { - 0.0 - } - } - - /** - * Start the consumer. - */ - public fun start() { - check(state == SimResourceState.Pending) { "Consumer is already started" } - - val now = clock.millis() - - state = SimResourceState.Active - isProcessing = true - latestFlush = now - - try { - consumer.onEvent(this, SimResourceEvent.Start) - activeCommand = interpret(consumer.onNext(this), now) - } catch (cause: Throwable) { - doFail(cause) - } finally { - isProcessing = false - } - } - - /** - * Immediately stop the consumer. - */ - public fun stop() { - try { - isProcessing = true - latestFlush = clock.millis() - - flush(isIntermediate = true) - doStop() - } finally { - isProcessing = false - } - } - - override fun flush(isIntermediate: Boolean) { - // Flush is no-op when the consumer is finished or not yet started - if (state != SimResourceState.Active) { - return - } - - val now = clock.millis() - - // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. - if (isIntermediate && latestFlush >= now) { - return - } - - try { - val activeCommand = activeCommand ?: return - val (timestamp, command) = activeCommand - - // Note: accessor is reliant on activeCommand being set - val remainingWork = remainingWork - - isProcessing = true - - val duration = now - timestamp - assert(duration >= 0) { "Flush in the past" } - - this.activeCommand = when (command) { - is SimResourceCommand.Idle -> { - // We should only continue processing the next command if: - // 1. The resource consumer reached its deadline. - // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Idle(command.deadline), now) - } - } - is SimResourceCommand.Consume -> { - // We should only continue processing the next command if: - // 1. The resource consumption was finished. - // 2. The resource capacity cannot satisfy the demand. - // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) - } - } - SimResourceCommand.Exit -> - // Flush may not be called when the resource consumer has finished - throw IllegalStateException() - } - - // Flush remaining work cache - _remainingWorkFlush = Long.MIN_VALUE - } catch (cause: Throwable) { - doFail(cause) - } finally { - latestFlush = now - isProcessing = false - } - } - - override fun interrupt() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isProcessing) { - return - } - - scheduler.schedule(this, isIntermediate = false) - } - - override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" - - /** - * A flag to indicate that the resource is currently processing a command. - */ - private var isProcessing: Boolean = false - - /** - * The current command that is being processed. - */ - private var activeCommand: CommandWrapper? = null - - /** - * The latest timestamp at which the resource was flushed. - */ - private var latestFlush: Long = Long.MIN_VALUE - - /** - * Finish the consumer and resource provider. - */ - private fun doStop() { - val state = state - this.state = SimResourceState.Stopped - - if (state == SimResourceState.Active) { - activeCommand = null - try { - consumer.onEvent(this, SimResourceEvent.Exit) - onFinish() - } catch (cause: Throwable) { - doFail(cause) - } - } - } - - /** - * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. - */ - private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { - when (command) { - is SimResourceCommand.Idle -> { - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = 0.0 - - onIdle(deadline) - consumer.onEvent(this, SimResourceEvent.Run) - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = min(capacity, limit) - onConsume(work, limit, deadline) - consumer.onEvent(this, SimResourceEvent.Run) - } - is SimResourceCommand.Exit -> { - speed = 0.0 - - doStop() - - // No need to set the next active command - return null - } - } - - return CommandWrapper(now, command) - } - - /** - * Request the workload for more work. - */ - private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) - - /** - * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. - */ - private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { - val (timestamp, command) = wrapper - val duration = now - timestamp - return when (command) { - is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) - is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 - } - } - - /** - * Fail the resource consumer. - */ - private fun doFail(cause: Throwable) { - state = SimResourceState.Stopped - activeCommand = null - - try { - consumer.onFailure(this, cause) - } catch (e: Throwable) { - e.addSuppressed(cause) - e.printStackTrace() - } - - onFinish() - } - - /** - * Indicate that the capacity of the resource has changed. - */ - private fun onCapacityChange() { - // Do not inform the consumer if it has not been started yet - if (state != SimResourceState.Active) { - return - } - - val isThrottled = speed > capacity - - consumer.onEvent(this, SimResourceEvent.Capacity) - - // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. - // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). - if (isThrottled) { - flush(isIntermediate = true) - } - } - - /** - * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. - */ - private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt new file mode 100644 index 00000000..de26f99e --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + +/** + * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations. + */ +public abstract class SimAbstractResourceProvider( + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem?, + initialCapacity: Double +) : SimResourceProvider { + /** + * The capacity of the resource. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** + * The current processing speed of the resource. + */ + public override val speed: Double + get() = ctx?.speed ?: 0.0 + + /** + * The resource processing speed demand at this instant. + */ + public override val demand: Double + get() = ctx?.demand ?: 0.0 + + /** + * The resource counters to track the execution metrics of the resource. + */ + public override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + + /** + * The [SimResourceControllableContext] that is currently running. + */ + protected var ctx: SimResourceControllableContext? = null + private set + + /** + * The state of the resource provider. + */ + final override var state: SimResourceState = SimResourceState.Pending + private set + + /** + * Construct the [SimResourceProviderLogic] instance for a new consumer. + */ + protected abstract fun createLogic(): SimResourceProviderLogic + + /** + * Start the specified [SimResourceControllableContext]. + */ + protected open fun start(ctx: SimResourceControllableContext) { + ctx.start() + } + + /** + * Update the counters of the resource provider. + */ + protected fun updateCounters(ctx: SimResourceContext, work: Double) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += work + counters.actual += work - remainingWork + counters.overcommit += remainingWork + } + + final override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + val ctx = interpreter.newContext(consumer, createLogic(), parent) + + ctx.capacity = capacity + this.ctx = ctx + this.state = SimResourceState.Active + + start(ctx) + } + + override fun close() { + cancel() + state = SimResourceState.Stopped + } + + final override fun interrupt() { + ctx?.interrupt() + } + + final override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.close() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + + override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]" +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt index bb4e6a2c..00972f43 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -25,24 +25,14 @@ package org.opendc.simulator.resources /** * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. */ -public interface SimResourceAggregator : AutoCloseable { - /** - * The output resource provider to which resource consumers can be attached. - */ - public val output: SimResourceProvider - +public interface SimResourceAggregator : SimResourceProvider { /** * The input resources that will be switched between the output providers. */ public val inputs: Set<SimResourceProvider> /** - * Add the specified [input] to the switch. + * Add the specified [input] to the aggregator. */ public fun addInput(input: SimResourceProvider) - - /** - * End the lifecycle of the aggregator. - */ - public override fun close() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 5665abd1..c39c1aca 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -25,7 +25,10 @@ package org.opendc.simulator.resources /** * A [SimResourceAggregator] that distributes the load equally across the input resources. */ -public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) { +public class SimResourceAggregatorMaxMin( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null +) : SimAbstractResourceAggregator(interpreter, parent) { private val consumers = mutableListOf<Input>() override fun doConsume(work: Double, limit: Double, deadline: Long) { @@ -35,7 +38,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { val inputCapacity = input.ctx.capacity - val fraction = inputCapacity / outputContext.capacity + val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction val grantedWork = fraction * work @@ -53,7 +56,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA } } - override fun doFinish(cause: Throwable?) { + override fun doFinish() { val iterator = consumers.iterator() for (input in iterator) { iterator.remove() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 7c76c634..0d9a6106 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -45,6 +45,11 @@ public interface SimResourceContext { public val speed: Double /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + + /** * The amount of work still remaining at this instant. */ public val remainingWork: Double diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt index cf0bbb28..ceaca39a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt @@ -20,30 +20,45 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.cpufreq - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.SimProcessingUnit -import org.opendc.simulator.compute.power.PowerModel +package org.opendc.simulator.resources /** - * A [ScalingDriver] that ignores the instructions of the [ScalingGovernor] and directly computes the power consumption - * based on the specified [power model][model]. + * A controllable [SimResourceContext]. + * + * This interface is used by resource providers to control the resource context. */ -public class SimpleScalingDriver(private val model: PowerModel) : ScalingDriver { - override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic { - override fun createContext(cpu: SimProcessingUnit): ScalingContext { - return object : ScalingContext { - override val machine: SimMachine = machine +public interface SimResourceControllableContext : SimResourceContext, AutoCloseable { + /** + * The state of the resource context. + */ + public val state: SimResourceState + + /** + * The capacity of the resource. + */ + public override var capacity: Double - override val cpu: SimProcessingUnit = cpu + /** + * Start the resource context. + */ + public fun start() - override fun setTarget(freq: Double) {} - } - } + /** + * Stop the resource context. + */ + public override fun close() - override fun computePower(): Double = model.computePower(machine.usage.value) + /** + * Invalidate the resource context's state. + * + * By invalidating the resource context's current state, the state is re-computed and the current progress is + * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the + * synchronous variant. + */ + public fun invalidate() - override fun toString(): String = "SimpleScalingDriver.Logic" - } + /** + * Synchronously flush the progress of the resource context and materialize its current progress. + */ + public fun flush() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt index f6a1a42e..725aa5bc 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt @@ -23,15 +23,26 @@ package org.opendc.simulator.resources /** - * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer. + * An interface that tracks cumulative counts of the work performed by a resource. */ -public interface SimResourceFlushable { +public interface SimResourceCounters { /** - * Flush the current active resource consumption. - * - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. + * The amount of work that resource consumers wanted the resource to perform. */ - public fun flush(isIntermediate: Boolean) + public val demand: Double + + /** + * The amount of work performed by the resource. + */ + public val actual: Double + + /** + * The amount of work that could not be completed due to overcommitted resources. + */ + public val overcommit: Double + + /** + * Reset the resource counters. + */ + public fun reset() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index b2759b7f..e0333ff9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -25,19 +25,14 @@ package org.opendc.simulator.resources /** * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ -public interface SimResourceDistributor : AutoCloseable { +public interface SimResourceDistributor : SimResourceConsumer { /** * The output resource providers to which resource consumers can be attached. */ public val outputs: Set<SimResourceProvider> /** - * The input resource that will be distributed over the consumers. + * Create a new output for the distributor. */ - public val input: SimResourceProvider - - /** - * Add an output to the switch with the specified [capacity]. - */ - public fun addOutput(capacity: Double): SimResourceProvider + public fun newOutput(): SimResourceProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index a76cb1e3..be9e89fb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -29,23 +29,22 @@ import kotlin.math.min * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. */ public class SimResourceDistributorMaxMin( - override val input: SimResourceProvider, - private val scheduler: SimResourceScheduler, - private val listener: Listener? = null + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null ) : SimResourceDistributor { override val outputs: Set<SimResourceProvider> get() = _outputs - private val _outputs = mutableSetOf<OutputProvider>() + private val _outputs = mutableSetOf<Output>() /** - * The active output contexts. + * The resource context of the consumer. */ - private val outputContexts: MutableList<OutputContext> = mutableListOf() + private var ctx: SimResourceContext? = null /** - * The total speed requested by the output resources. + * The active outputs. */ - private var totalRequestedSpeed = 0.0 + private val activeOutputs: MutableList<Output> = mutableListOf() /** * The total amount of work requested by the output resources. @@ -57,147 +56,83 @@ public class SimResourceDistributorMaxMin( */ private var totalAllocatedSpeed = 0.0 - /** - * The total allocated work requested for the output resources. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private val consumer = object : SimResourceConsumer { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext - - val remainingWork: Double - get() = ctx.remainingWork - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx.capacity) - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - } - SimResourceEvent.Exit -> { - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() - - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() - - output.close() - } - } - else -> {} - } - } - } - - /** - * The total amount of remaining work. - */ - private val totalRemainingWork: Double - get() = consumer.remainingWork - - override fun addOutput(capacity: Double): SimResourceProvider { - check(!isClosed) { "Distributor has been closed" } - - val provider = OutputProvider(capacity) + /* SimResourceDistributor */ + override fun newOutput(): SimResourceProvider { + val provider = Output(ctx?.capacity ?: 0.0) _outputs.add(provider) return provider } - override fun close() { - if (!isClosed) { - isClosed = true - input.cancel() - } + /* SimResourceConsumer */ + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) } - init { - input.startConsumer(consumer) - } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + updateCapacity(ctx) + } + SimResourceEvent.Exit -> { + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - input.interrupt() + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() + + output.close() + } + } + SimResourceEvent.Capacity -> updateCapacity(ctx) + else -> {} + } } /** - * Schedule the work over the physical CPUs. + * Schedule the work of the outputs. */ - private fun doSchedule(capacity: Double): SimResourceCommand { - // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { + private fun doNext(capacity: Double): SimResourceCommand { + // If there is no work yet, mark the input as idle. + if (activeOutputs.isEmpty()) { return SimResourceCommand.Idle() } - val maxUsage = capacity var duration: Double = Double.MAX_VALUE var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage + var availableSpeed = capacity var totalRequestedSpeed = 0.0 var totalRequestedWork = 0.0 - // Flush the work of the outputs - var outputIterator = outputContexts.listIterator() - while (outputIterator.hasNext()) { - val output = outputIterator.next() - - output.flush(isIntermediate = true) + // Pull in the work of the outputs + val outputIterator = activeOutputs.listIterator() + for (output in outputIterator) { + output.pull() - if (output.activeCommand == SimResourceCommand.Exit) { - // Apparently the output consumer has exited, so remove it from the scheduling queue. + // Remove outputs that have finished + if (output.isFinished) { outputIterator.remove() } } - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() + // Sort in-place the outputs based on their requested usage. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeOutputs.sort() // Divide the available input capacity fairly across the outputs using max-min fair sharing - outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() + var remaining = activeOutputs.size + for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- when (val command = output.activeCommand) { is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) - output.actualSpeed = 0.0 } is SimResourceCommand.Consume -> { val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) // Ignore idle computation @@ -212,216 +147,139 @@ public class SimResourceDistributorMaxMin( output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed - // The duration that we want to run is that of the shortest request from an output + // The duration that we want to run is that of the shortest request of an output duration = min(duration, command.work / grantedSpeed) } SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } } } - assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" } + assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } - this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = maxUsage - availableSpeed - this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + this.totalAllocatedSpeed = capacity - availableSpeed + val totalAllocatedWork = min( + totalRequestedWork, + totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration) + ) return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline) else SimResourceCommand.Idle(deadline) } - /** - * Obtain the next command to perform. - */ - private fun doNext(capacity: Double): SimResourceCommand { - val totalRequestedWork = totalRequestedWork.toLong() - val totalRemainingWork = totalRemainingWork.toLong() - val totalAllocatedWork = totalAllocatedWork.toLong() - val totalRequestedSpeed = totalRequestedSpeed - val totalAllocatedSpeed = totalAllocatedSpeed - - // Force all inputs to re-schedule their work. - val command = doSchedule(capacity) - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork, - totalAllocatedWork - totalRemainingWork, - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalAllocatedSpeed, - totalRequestedSpeed - ) - - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - return command + private fun updateCapacity(ctx: SimResourceContext) { + for (output in _outputs) { + output.capacity = ctx.capacity + } } /** - * Event listener for hypervisor events. + * An internal [SimResourceProvider] implementation for switch outputs. */ - public interface Listener { + private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable<Output> { /** - * This method is invoked when a slice is finished. + * The current command that is processed by the resource. */ - public fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class OutputProvider(val capacity: Double) : SimResourceProvider { /** - * The [OutputContext] that is currently running. + * The processing speed that is allowed by the model constraints. */ - private var ctx: OutputContext? = null + var allowedSpeed: Double = 0.0 - override var state: SimResourceState = SimResourceState.Pending - internal set + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + /** + * A flag to indicate that the output is finished. + */ + val isFinished + get() = activeCommand is SimResourceCommand.Exit - val ctx = OutputContext(this, consumer) - this.ctx = ctx - this.state = SimResourceState.Active - outputContexts += ctx + /** + * The timestamp at which we received the last command. + */ + private var lastCommandTimestamp: Long = Long.MIN_VALUE - ctx.start() - schedule() - } + /* SimAbstractResourceProvider */ + override fun createLogic(): SimResourceProviderLogic = this - override fun close() { - cancel() + override fun start(ctx: SimResourceControllableContext) { + activeOutputs += this - if (state != SimResourceState.Stopped) { - state = SimResourceState.Stopped - _outputs.remove(this) + interpreter.batch { + ctx.start() + // Interrupt the input to re-schedule the resources + this@SimResourceDistributorMaxMin.ctx?.interrupt() } } - override fun interrupt() { - ctx?.interrupt() - } + override fun close() { + val state = state - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } + super.close() if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending + _outputs.remove(this) } } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - private val provider: OutputProvider, - consumer: SimResourceConsumer - ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - private fun reportOvercommit() { - val remainingWork = remainingWork - totalOvercommittedWork += remainingWork - } - - override fun onIdle(deadline: Long) { - reportOvercommit() + /* SimResourceProviderLogic */ + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) - } + lastCommandTimestamp = ctx.clock.millis() - override fun onConsume(work: Double, limit: Double, deadline: Long) { - reportOvercommit() + return Long.MAX_VALUE + } - allowedSpeed = speed + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + allowedSpeed = ctx.speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) + lastCommandTimestamp = ctx.clock.millis() + + return Long.MAX_VALUE } - override fun onFinish() { - reportOvercommit() + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + override fun onFinish(ctx: SimResourceControllableContext) { activeCommand = SimResourceCommand.Exit - provider.cancel() + lastCommandTimestamp = ctx.clock.millis() } - override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - // Compute the remaining amount of work return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM + // Compute the fraction of compute time allocated to the output val fraction = actualSpeed / totalAllocatedSpeed - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) + // Compute the work that was actually granted to the output. + val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction + max(0.0, work - processingAvailable) } else { 0.0 } } - private var isProcessing: Boolean = false - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - try { - isProcessing = false + /* Comparable */ + override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed) - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } finally { - isProcessing = true + /** + * Pull the next command if necessary. + */ + fun pull() { + val ctx = ctx + if (ctx != null && lastCommandTimestamp < ctx.clock.millis()) { + ctx.flush() } } - - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt new file mode 100644 index 00000000..82631377 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The resource interpreter is responsible for managing the interaction between resource consumer and provider. + * + * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public interface SimResourceInterpreter { + /** + * The [Clock] associated with this interpreter. + */ + public val clock: Clock + + /** + * Create a new [SimResourceControllableContext] with the given [provider]. + * + * @param consumer The consumer logic. + * @param provider The logic of the resource provider. + * @param parent The system to which the resource context belongs. + */ + public fun newContext( + consumer: SimResourceConsumer, + provider: SimResourceProviderLogic, + parent: SimResourceSystem? = null + ): SimResourceControllableContext + + /** + * Start batching the execution of resource updates until [popBatch] is called. + * + * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs + * simultaneously) in a single state update. + * + * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the + * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called + * the same amount of times. To simplify batching, see [batch]. + */ + public fun pushBatch() + + /** + * Stop the batching of resource updates and run the interpreter on the batch. + * + * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call. + */ + public fun popBatch() + + public companion object { + /** + * Construct a new [SimResourceInterpreter] implementation. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ + @JvmName("create") + public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter { + return SimResourceInterpreterImpl(context, clock) + } + } +} + +/** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ +public inline fun SimResourceInterpreter.batch(block: () -> Unit) { + try { + pushBatch() + block() + } finally { + popBatch() + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 2f567a5e..f709ca17 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -27,7 +27,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException /** - * A [SimResourceProvider] provides some resource of type [R]. + * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. */ public interface SimResourceProvider : AutoCloseable { /** @@ -36,6 +36,26 @@ public interface SimResourceProvider : AutoCloseable { public val state: SimResourceState /** + * The resource capacity available at this instant. + */ + public val capacity: Double + + /** + * The current processing speed of the resource. + */ + public val speed: Double + + /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + + /** + * The resource counters to track the execution metrics of the resource. + */ + public val counters: SimResourceCounters + + /** * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. * * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt new file mode 100644 index 00000000..5231ecf5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlin.math.max + +/** + * The logic of a resource provider. + */ +public interface SimResourceProviderLogic { + /** + * This method is invoked when the resource is reported to idle until the specified [deadline]. + * + * @param ctx The context in which the provider runs. + * @param deadline The deadline that was requested by the resource consumer. + * @return The instant at which to resume the consumer. + */ + public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long + + /** + * This method is invoked when the resource will be consumed until the specified amount of [work] was processed + * or [deadline] is reached. + * + * @param ctx The context in which the provider runs. + * @param work The amount of work that was requested by the resource consumer. + * @param limit The limit on the work rate of the resource consumer. + * @param deadline The deadline that was requested by the resource consumer. + * @return The instant at which to resume the consumer. + */ + public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long + + /** + * This method is invoked when the progress of the resource consumer is materialized. + * + * @param ctx The context in which the provider runs. + * @param work The amount of work that was requested by the resource consumer. + */ + public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {} + + /** + * This method is invoked when the resource consumer has finished. + */ + public fun onFinish(ctx: SimResourceControllableContext) + + /** + * Get the remaining work to process after a resource consumption. + * + * @param work The size of the resource consumption. + * @param speed The speed of consumption. + * @param duration The duration from the start of the consumption until now. + * @return The amount of work remaining. + */ + public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return if (duration > 0L) { + val processed = duration / 1000.0 * speed + max(0.0, work - processed) + } else { + 0.0 + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt deleted file mode 100644 index a228c47b..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock - -/** - * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource - * providers and consumers. - * - * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more - * efficiently, reducing the overall work needed per update. - */ -public interface SimResourceScheduler { - /** - * The [Clock] associated with this scheduler. - */ - public val clock: Clock - - /** - * Schedule a direct interrupt for the resource context represented by [flushable]. - * - * @param flushable The resource context that needs to be flushed. - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false) - - /** - * Schedule an interrupt in the future for the resource context represented by [flushable]. - * - * This method will override earlier calls to this method for the same [flushable]. - * - * @param flushable The resource context that needs to be flushed. - * @param timestamp The timestamp when the interrupt should happen. - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false) - - /** - * Batch the execution of several interrupts into a single call. - * - * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. - */ - public fun batch(block: () -> Unit) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt deleted file mode 100644 index cdbb4a6c..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.utils.TimerScheduler -import java.time.Clock -import java.util.ArrayDeque -import kotlin.coroutines.CoroutineContext - -/** - * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after. - * - * @param clock The virtual simulation clock. - */ -public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler { - /** - * The [TimerScheduler] to actually schedule the interrupts. - */ - private val timers = TimerScheduler<Any>(context, clock) - - /** - * A flag to indicate that an interrupt is currently running already. - */ - private var isRunning: Boolean = false - - /** - * The queue of resources to be flushed. - */ - private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>() - - override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) { - queue.add(flushable to isIntermediate) - - if (isRunning) { - return - } - - flush() - } - - override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) { - timers.startSingleTimerTo(flushable, timestamp) { - schedule(flushable, isIntermediate) - } - } - - override fun batch(block: () -> Unit) { - val wasAlreadyRunning = isRunning - try { - isRunning = true - block() - } finally { - if (!wasAlreadyRunning) { - isRunning = false - } - } - } - - /** - * Flush the scheduled queue. - */ - private fun flush() { - val visited = mutableSetOf<SimResourceFlushable>() - try { - isRunning = true - while (queue.isNotEmpty()) { - val (flushable, isIntermediate) = queue.poll() - flushable.flush(isIntermediate) - visited.add(flushable) - } - } finally { - isRunning = false - } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3277b889..9f062cc3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,98 +26,39 @@ import kotlin.math.ceil import kotlin.math.min /** - * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. + * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity. * * @param initialCapacity The initial capacity of the resource. - * @param scheduler The scheduler to schedule the interrupts. + * @param interpreter The interpreter that is used for managing the resource contexts. + * @param parent The parent resource system. */ public class SimResourceSource( initialCapacity: Double, - private val scheduler: SimResourceScheduler -) : SimResourceProvider { - /** - * The current processing speed of the resource. - */ - public val speed: Double - get() = ctx?.speed ?: 0.0 - - /** - * The capacity of the resource. - */ - public var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - - /** - * The [Context] that is currently running. - */ - private var ctx: Context? = null - - override var state: SimResourceState = SimResourceState.Pending - private set - - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } - val ctx = Context(consumer) - - this.ctx = ctx - this.state = SimResourceState.Active - - ctx.start() - } - - override fun close() { - cancel() - state = SimResourceState.Stopped - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - - /** - * Internal implementation of [SimResourceContext] for this class. - */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) { - override fun onIdle(deadline: Long) { - // Do not resume if deadline is "infinite" - if (deadline != Long.MAX_VALUE) { - scheduler.schedule(this, deadline) + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null +) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { + override fun createLogic(): SimResourceProviderLogic { + return object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + return deadline } - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - val until = min(deadline, clock.millis() + getDuration(work, speed)) - scheduler.schedule(this, until) - } - override fun onFinish() { - cancel() + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + return min(deadline, ctx.clock.millis() + getDuration(work, speed)) + } - ctx = null + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } - if (this@SimResourceSource.state != SimResourceState.Stopped) { - this@SimResourceSource.state = SimResourceState.Pending + override fun onFinish(ctx: SimResourceControllableContext) { + cancel() } } - - override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + override fun toString(): String = "SimResourceSource[capacity=$capacity]" + /** * Compute the duration that a resource consumption will take with the specified [speed]. */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index 53fec16a..e224285e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable { public val inputs: Set<SimResourceProvider> /** - * Add an output to the switch with the specified [capacity]. + * The resource counters to track the execution metrics of all switch resources. */ - public fun addOutput(capacity: Double): SimResourceProvider + public val counters: SimResourceCounters + + /** + * Create a new output on the switch. + */ + public fun newOutput(): SimResourceProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 1a9dd0bc..2950af80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override val inputs: Set<SimResourceProvider> get() = _inputs - override fun addOutput(capacity: Double): SimResourceProvider { + override val counters: SimResourceCounters = object : SimResourceCounters { + override val demand: Double + get() = _inputs.sumOf { it.counters.demand } + override val actual: Double + get() = _inputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _inputs.sumOf { it.counters.overcommit } + + override fun reset() { + for (input in _inputs) { + input.counters.reset() + } + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(capacity, forwarder) + val output = Provider(forwarder) _outputs += output return output } @@ -84,13 +101,9 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { _inputs.forEach(SimResourceProvider::cancel) } - private inner class Provider( - private val capacity: Double, - private val forwarder: SimResourceTransformer - ) : SimResourceProvider by forwarder { + private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { override fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. - _outputs -= this availableResources += forwarder } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 5dc1e68d..684a1b52 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -22,23 +22,31 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.* - /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ public class SimResourceSwitchMaxMin( - scheduler: SimResourceScheduler, - private val listener: Listener? = null + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null ) : SimResourceSwitch { - private val _outputs = mutableSetOf<SimResourceProvider>() + /** + * The output resource providers to which resource consumers can be attached. + */ override val outputs: Set<SimResourceProvider> - get() = _outputs + get() = distributor.outputs - private val _inputs = mutableSetOf<SimResourceProvider>() + /** + * The input resources that will be switched between the output providers. + */ override val inputs: Set<SimResourceProvider> - get() = _inputs + get() = aggregator.inputs + + /** + * The resource counters to track the execution metrics of all switch resources. + */ + override val counters: SimResourceCounters + get() = aggregator.counters /** * A flag to indicate that the switch was closed. @@ -48,37 +56,24 @@ public class SimResourceSwitchMaxMin( /** * The aggregator to aggregate the resources. */ - private val aggregator = SimResourceAggregatorMaxMin(scheduler) + private val aggregator = SimResourceAggregatorMaxMin(interpreter, parent) /** * The distributor to distribute the aggregated resources. */ - private val distributor = SimResourceDistributorMaxMin( - aggregator.output, scheduler, - object : SimResourceDistributorMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } - } - ) + private val distributor = SimResourceDistributorMaxMin(interpreter, parent) + + init { + aggregator.startConsumer(distributor) + } /** - * Add an output to the switch represented by [resource]. + * Add an output to the switch. */ - override fun addOutput(capacity: Double): SimResourceProvider { + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } - val provider = distributor.addOutput(capacity) - _outputs.add(provider) - return provider + return distributor.newOutput() } /** @@ -93,26 +88,7 @@ public class SimResourceSwitchMaxMin( override fun close() { if (!isClosed) { isClosed = true - distributor.close() aggregator.close() } } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt new file mode 100644 index 00000000..609262cb --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A system of possible multiple sub-resources. + * + * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the + * resource provider. + */ +public interface SimResourceSystem { + /** + * The parent system to which this system belongs or `null` if it has no parent. + */ + public val parent: SimResourceSystem? + + /** + * This method is invoked when the system has converged to a steady-state. + * + * @param timestamp The timestamp at which the system converged. + */ + public fun onConverge(timestamp: Long) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 32f3f573..fd3d1230 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + /** * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider. * @@ -53,6 +55,19 @@ public class SimResourceTransformer( override var state: SimResourceState = SimResourceState.Pending private set + override val capacity: Double + get() = ctx?.capacity ?: 0.0 + + override val speed: Double + get() = ctx?.speed ?: 0.0 + + override val demand: Double + get() = ctx?.demand ?: 0.0 + + override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } @@ -97,10 +112,15 @@ public class SimResourceTransformer( start() } + updateCounters(ctx) + return if (state == SimResourceState.Stopped) { SimResourceCommand.Exit } else if (delegate != null) { val command = transform(ctx, delegate.onNext(ctx)) + + _work = if (command is SimResourceCommand.Consume) command.work else 0.0 + if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards @@ -169,6 +189,22 @@ public class SimResourceTransformer( state = SimResourceState.Pending } } + + /** + * Counter to track the current submitted work. + */ + private var _work = 0.0 + + /** + * Update the resource counters for the transformer. + */ + private fun updateCounters(ctx: SimResourceContext) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += _work + counters.actual += _work - remainingWork + counters.overcommit += remainingWork + } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt new file mode 100644 index 00000000..46c5c63f --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -0,0 +1,422 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import org.opendc.simulator.resources.* +import java.time.Clock +import kotlin.math.min + +/** + * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + */ +internal class SimResourceContextImpl( + override val parent: SimResourceSystem?, + private val interpreter: SimResourceInterpreterImpl, + private val consumer: SimResourceConsumer, + private val logic: SimResourceProviderLogic +) : SimResourceControllableContext, SimResourceSystem { + /** + * The clock of the context. + */ + override val clock: Clock + get() = interpreter.clock + + /** + * The capacity of the resource. + */ + override var capacity: Double = 0.0 + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** + * The amount of work still remaining at this instant. + */ + override val remainingWork: Double + get() { + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(now).also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE + + /** + * A flag to indicate the state of the context. + */ + override val state: SimResourceState + get() = _state + private var _state = SimResourceState.Pending + + /** + * The current processing speed of the resource. + */ + override val speed: Double + get() = _speed + private var _speed = 0.0 + + /** + * The current resource processing demand. + */ + override val demand: Double + get() = _limit + + private val counters = object : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + /** + * The current state of the resource context. + */ + private var _timestamp: Long = Long.MIN_VALUE + private var _work: Double = 0.0 + private var _limit: Double = 0.0 + private var _deadline: Long = Long.MAX_VALUE + + /** + * The update flag indicating why the update was triggered. + */ + private var _flag: Flag = Flag.None + + /** + * The current pending update. + */ + private var _pendingUpdate: SimResourceInterpreterImpl.Update? = null + + override fun start() { + check(_state == SimResourceState.Pending) { "Consumer is already started" } + interpreter.batch { + consumer.onEvent(this, SimResourceEvent.Start) + _state = SimResourceState.Active + interrupt() + } + } + + override fun close() { + if (_state != SimResourceState.Stopped) { + interpreter.batch { + _state = SimResourceState.Stopped + doStop() + } + } + } + + override fun interrupt() { + if (_state == SimResourceState.Stopped) { + return + } + + enableFlag(Flag.Interrupt) + scheduleUpdate() + } + + override fun invalidate() { + if (_state == SimResourceState.Stopped) { + return + } + + enableFlag(Flag.Invalidate) + scheduleUpdate() + } + + override fun flush() { + if (_state == SimResourceState.Stopped) { + return + } + + interpreter.scheduleSync(this) + } + + /** + * Determine whether the state of the resource context should be updated. + */ + fun requiresUpdate(timestamp: Long): Boolean { + // Either the resource context is flagged or there is a pending update at this timestamp + return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp + } + + /** + * Update the state of the resource context. + */ + fun doUpdate(timestamp: Long) { + try { + val oldState = _state + val newState = doUpdate(timestamp, oldState) + + _state = newState + _flag = Flag.None + + when (newState) { + SimResourceState.Pending -> + if (oldState != SimResourceState.Pending) { + throw IllegalStateException("Illegal transition to pending state") + } + SimResourceState.Stopped -> + if (oldState != SimResourceState.Stopped) { + doStop() + } + else -> {} + } + } catch (cause: Throwable) { + doFail(cause) + } finally { + _remainingWorkFlush = Long.MIN_VALUE + _timestamp = timestamp + } + } + + override fun onConverge(timestamp: Long) { + if (_state == SimResourceState.Active) { + consumer.onEvent(this, SimResourceEvent.Run) + } + } + + override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]" + + /** + * Update the state of the resource context. + */ + private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState { + return when (state) { + // Resource context is not active, so its state will not update + SimResourceState.Pending, SimResourceState.Stopped -> state + SimResourceState.Active -> { + val isInterrupted = _flag == Flag.Interrupt + val remainingWork = remainingWork + val isConsume = _limit > 0.0 + + // Update the resource counters only if there is some progress + if (timestamp > _timestamp) { + logic.onUpdate(this, _work) + } + + // We should only continue processing the next command if: + // 1. The resource consumption was finished. + // 2. The resource capacity cannot satisfy the demand. + // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) { + next(timestamp) + } else if (isConsume) { + interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp) + } else { + interpret(SimResourceCommand.Idle(_deadline), timestamp) + } + } + } + } + + /** + * Stop the resource context. + */ + private fun doStop() { + try { + consumer.onEvent(this, SimResourceEvent.Exit) + logic.onFinish(this) + } catch (cause: Throwable) { + doFail(cause) + } + } + + /** + * Fail the resource consumer. + */ + private fun doFail(cause: Throwable) { + try { + consumer.onFailure(this, cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + e.printStackTrace() + } + + logic.onFinish(this) + } + + /** + * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + */ + private fun interpret(command: SimResourceCommand, now: Long): SimResourceState { + return when (command) { + is SimResourceCommand.Idle -> { + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = deadline + + val timestamp = logic.onIdle(this, deadline) + scheduleUpdate(timestamp) + + SimResourceState.Active + } + is SimResourceCommand.Consume -> { + val work = command.work + val limit = command.limit + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + _speed = min(capacity, limit) + _work = work + _limit = limit + _deadline = deadline + + val timestamp = logic.onConsume(this, work, limit, deadline) + scheduleUpdate(timestamp) + + SimResourceState.Active + } + is SimResourceCommand.Exit -> { + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = Long.MAX_VALUE + + SimResourceState.Stopped + } + } + } + + /** + * Request the workload for more work. + */ + private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now) + + /** + * Compute the remaining work based on the current state. + */ + private fun computeRemainingWork(now: Long): Double { + return if (_work > 0.0) + logic.getRemainingWork(this, _work, speed, now - _timestamp) + else 0.0 + } + + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + + interpreter.batch { + // Inform the consumer of the capacity change. This might already trigger an interrupt. + consumer.onEvent(this, SimResourceEvent.Capacity) + + // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command. + if (isThrottled) { + invalidate() + } + } + } + + /** + * Enable the specified [flag] taking into account precedence. + */ + private fun enableFlag(flag: Flag) { + _flag = when (_flag) { + Flag.None -> flag + Flag.Invalidate -> + when (flag) { + Flag.None -> flag + else -> flag + } + Flag.Interrupt -> + when (flag) { + Flag.None, Flag.Invalidate -> flag + else -> flag + } + } + } + + /** + * Schedule an update for this resource context. + */ + private fun scheduleUpdate() { + // Cancel the pending update + val pendingUpdate = _pendingUpdate + if (pendingUpdate != null) { + _pendingUpdate = null + pendingUpdate.cancel() + } + + interpreter.scheduleImmediate(this) + } + + /** + * Schedule a delayed update for this resource context. + */ + private fun scheduleUpdate(timestamp: Long) { + val pendingUpdate = _pendingUpdate + if (pendingUpdate != null) { + if (pendingUpdate.timestamp == timestamp) { + // Fast-path: A pending update for the same timestamp already exists + return + } else { + // Cancel the old pending update + _pendingUpdate = null + pendingUpdate.cancel() + } + } + + if (timestamp != Long.MAX_VALUE) { + _pendingUpdate = interpreter.scheduleDelayed(this, timestamp) + } + } + + /** + * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or + * interrupted. + */ + enum class Flag { + None, + Interrupt, + Invalidate + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt index ddbe1ca0..827019c5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt @@ -20,17 +20,23 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.cpufreq +package org.opendc.simulator.resources.impl + +import org.opendc.simulator.resources.SimResourceCounters /** - * A CPUFreq [ScalingGovernor] that requests the frequency based on the utilization of the machine. + * Mutable implementation of the [SimResourceCounters] interface. */ -public class DemandScalingGovernor : ScalingGovernor { - override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { - override fun onLimit() { - ctx.setTarget(ctx.cpu.speed) - } +internal class SimResourceCountersImpl : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 - override fun toString(): String = "DemandScalingGovernor.Logic" + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt new file mode 100644 index 00000000..cb0d6160 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -0,0 +1,331 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Runnable +import org.opendc.simulator.resources.* +import java.time.Clock +import java.util.* +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ +internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The queue of resource updates that are scheduled for immediate execution. + */ + private val queue = ArrayDeque<Update>() + + /** + * A priority queue containing the resource updates to be scheduled in the future. + */ + private val futureQueue = PriorityQueue<Update>() + + /** + * The stack of interpreter invocations to occur in the future. + */ + private val futureInvocations = ArrayDeque<Invocation>() + + /** + * The systems that have been visited during the interpreter cycle. + */ + private val visited = linkedSetOf<SimResourceSystem>() + + /** + * The index in the batch stack. + */ + private var batchIndex = 0 + + /** + * A flag to indicate that the interpreter is currently active. + */ + private val isRunning: Boolean + get() = batchIndex > 0 + + /** + * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle. + * + * This method should be used when the state of a resource context is invalidated/interrupted and needs to be + * re-computed. In case no interpreter is currently active, the interpreter will be started. + */ + fun scheduleImmediate(ctx: SimResourceContextImpl) { + queue.add(Update(ctx, Long.MIN_VALUE)) + + // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked + // up by the active interpreter. + if (isRunning) { + return + } + + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + } + + /** + * Update the specified [ctx] synchronously. + */ + fun scheduleSync(ctx: SimResourceContextImpl) { + ctx.doUpdate(clock.millis()) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) + } + + // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked + // up by the active interpreter. + if (isRunning) { + return + } + + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + } + + /** + * Schedule the interpreter to run at [timestamp] to update the resource contexts. + * + * This method will override earlier calls to this method for the same [ctx]. + * + * @param ctx The resource context to which the event applies. + * @param timestamp The timestamp when the interrupt should happen. + */ + fun scheduleDelayed(ctx: SimResourceContextImpl, timestamp: Long): Update { + val now = clock.millis() + val futureQueue = futureQueue + + require(timestamp >= now) { "Timestamp must be in the future" } + + val update = Update(ctx, timestamp) + futureQueue.add(update) + + // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference. + if (futureQueue.peek() === update) { + trySchedule(futureQueue, futureInvocations) + } + + return update + } + + override fun newContext( + consumer: SimResourceConsumer, + provider: SimResourceProviderLogic, + parent: SimResourceSystem? + ): SimResourceControllableContext = SimResourceContextImpl(parent, this, consumer, provider) + + override fun pushBatch() { + batchIndex++ + } + + override fun popBatch() { + try { + // Flush the work if the platform is not already running + if (batchIndex == 1 && queue.isNotEmpty()) { + runInterpreter() + } + } finally { + batchIndex-- + } + } + + /** + * Interpret all actions that are scheduled for the current timestamp. + */ + private fun runInterpreter() { + val now = clock.millis() + val queue = queue + val futureQueue = futureQueue + val futureInvocations = futureInvocations + val visited = visited + + // Execute all scheduled updates at current timestamp + while (true) { + val update = futureQueue.peek() ?: break + + assert(update.timestamp >= now) { "Internal inconsistency: found update of the past" } + + if (update.timestamp > now && !update.isCancelled) { + // Schedule a task for the next event to occur. + trySchedule(futureQueue, futureInvocations) + break + } + + futureQueue.poll() + + if (update(now) && visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } + } + + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val update = queue.poll() ?: break + if (update(now) && visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } + } + + for (system in visited) { + system.onConverge(now) + } + + visited.clear() + } while (queue.isNotEmpty()) + } + + /** + * Try to schedule the next interpreter event. + */ + private fun trySchedule(queue: PriorityQueue<Update>, scheduled: ArrayDeque<Invocation>) { + val nextTimer = queue.peek() + val now = clock.millis() + + // Check whether we need to update our schedule: + if (nextTimer == null) { + // Case 1: all timers are cancelled + for (invocation in scheduled) { + invocation.cancel() + } + scheduled.clear() + return + } + + while (true) { + val invocation = scheduled.peekFirst() + if (invocation == null || invocation.timestamp > nextTimer.timestamp) { + // Case 2: A new timer was registered ahead of the other timers. + // Solution: Schedule a new scheduler invocation + val nextTimestamp = nextTimer.timestamp + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout( + nextTimestamp - now, + { + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + }, + context + ) + scheduled.addFirst(Invocation(nextTimestamp, handle)) + break + } else if (invocation.timestamp < nextTimer.timestamp) { + // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted + // Solution: Cancel the next scheduler invocation + invocation.cancel() + scheduled.pollFirst() + } else { + break + } + } + } + + /** + * Collect all the ancestors of the specified [system]. + */ + private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) { + val parent = system.parent + if (parent != null) { + systems.add(parent) + collectAncestors(parent, systems) + } + } + + /** + * A future interpreter invocation. + * + * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private data class Invocation( + @JvmField val timestamp: Long, + @JvmField private val disposableHandle: DisposableHandle + ) { + /** + * Cancel the interpreter invocation. + */ + fun cancel() = disposableHandle.dispose() + } + + /** + * An update call for [ctx] that is scheduled for [timestamp]. + * + * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be + * cancelled if the resource context was invalidated in the meantime. + */ + class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable<Update> { + /** + * A flag to indicate that the task has been cancelled. + */ + @JvmField + var isCancelled: Boolean = false + + /** + * Cancel the update. + */ + fun cancel() { + isCancelled = true + } + + /** + * Immediately run update. + */ + operator fun invoke(timestamp: Long): Boolean { + val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp) + if (shouldExecute) { + ctx.doUpdate(timestamp) + } + return shouldExecute + } + + override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp) + + override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]" + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 2b32300e..51024e80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceAggregatorMaxMin] class. @@ -41,7 +42,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimResourceAggregatorMaxMinTest { @Test fun testSingleCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val forwarder = SimResourceForwarder() @@ -58,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest { source.startConsumer(adapter) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertAll( @@ -66,13 +67,13 @@ internal class SimResourceAggregatorMaxMinTest { { assertEquals(listOf(0.0, 0.5, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testDoubleCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -86,20 +87,20 @@ internal class SimResourceAggregatorMaxMinTest { val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(adapter) + aggregator.consume(adapter) yield() assertAll( { assertEquals(1000, clock.millis()) }, { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testOvercommit() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -114,19 +115,19 @@ internal class SimResourceAggregatorMaxMinTest { .andThen(SimResourceCommand.Exit) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertEquals(1000, clock.millis()) verify(exactly = 2) { consumer.onNext(any()) } } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testException() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -141,17 +142,17 @@ internal class SimResourceAggregatorMaxMinTest { .andThenThrows(IllegalStateException("Test Exception")) try { - assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } + assertThrows<IllegalStateException> { aggregator.consume(consumer) } yield() assertEquals(SimResourceState.Pending, sources[0].state) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -163,20 +164,20 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(4.0, 1.0) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(1000) sources[0].capacity = 0.5 } yield() assertEquals(2334, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testFailOverCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -188,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(1.0, 0.5) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(500) sources[0].capacity = 0.5 } yield() assertEquals(1000, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() + } + } + + @Test + fun testCounters() = runBlockingSimulation { + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(scheduler) + val sources = listOf( + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) + assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } + } finally { + aggregator.close() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 2e2d6588..6cb507ce 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -26,98 +26,109 @@ import io.mockk.* import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.impl.SimResourceContextImpl +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** - * A test suite for the [SimAbstractResourceContext] class. + * A test suite for the [SimResourceContextImpl] class. */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) - context.flush(isIntermediate = false) + context.doUpdate(interpreter.clock.millis()) } @Test fun testIntermediateFlush() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline }) + val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path - context.flush(isIntermediate = true) + context.doUpdate(interpreter.clock.millis()) - verify(exactly = 2) { context.onConsume(any(), any(), any()) } + verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } } @Test fun testIntermediateFlushIdle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline }) + val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) context.start() delay(5) - context.flush(isIntermediate = true) + context.invalidate() delay(5) - context.flush(isIntermediate = true) + context.invalidate() assertAll( - { verify(exactly = 2) { context.onIdle(any()) } }, - { verify(exactly = 1) { context.onFinish() } } + { verify(exactly = 2) { logic.onIdle(any(), any()) } }, + { verify(exactly = 1) { logic.onFinish(any()) } } ) } @Test fun testDoubleStart() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() - assertThrows<IllegalStateException> { context.start() } + + assertThrows<IllegalStateException> { + context.start() + } } @Test fun testIdempodentCapacityChange() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) + context.capacity = 4200.0 context.start() context.capacity = 4200.0 @@ -126,17 +137,19 @@ class SimResourceContextTest { @Test fun testFailureNoInfiniteLoop() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Exit every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} - } + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline + }) + + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 5e86088d..08d88093 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceSource] class. @@ -40,7 +41,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimResourceSourceTest { @Test fun testSpeed() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -63,7 +64,7 @@ class SimResourceSourceTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -83,7 +84,7 @@ class SimResourceSourceTest { @Test fun testSpeedLimit() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -110,7 +111,7 @@ class SimResourceSourceTest { */ @Test fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -133,7 +134,7 @@ class SimResourceSourceTest { @Test fun testInterrupt() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) lateinit var resCtx: SimResourceContext @@ -174,7 +175,7 @@ class SimResourceSourceTest { @Test fun testFailure() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -193,7 +194,7 @@ class SimResourceSourceTest { @Test fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -213,7 +214,7 @@ class SimResourceSourceTest { @Test fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -236,7 +237,7 @@ class SimResourceSourceTest { @Test fun testClosedConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -257,7 +258,7 @@ class SimResourceSourceTest { @Test fun testCloseDuringConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -279,7 +280,7 @@ class SimResourceSourceTest { @Test fun testIdle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -301,7 +302,7 @@ class SimResourceSourceTest { fun testInfiniteSleep() { assertThrows<IllegalStateException> { runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -321,7 +322,7 @@ class SimResourceSourceTest { @Test fun testIncorrectDeadline() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 32b6d8ad..ad8d82e3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitchExclusive] class. @@ -44,7 +45,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTrace() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val speed = mutableListOf<Double>() @@ -66,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest { source.startConsumer(adapter) switch.addInput(forwarder) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -86,7 +87,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -97,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -113,7 +114,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = object : SimResourceConsumer { @@ -141,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -158,7 +159,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -169,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - switch.addOutput(3200.0) - assertThrows<IllegalStateException> { switch.addOutput(3200.0) } + switch.newOutput() + assertThrows<IllegalStateException> { switch.newOutput() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index e7dec172..e4292ec0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitch] implementations @@ -40,13 +41,13 @@ import org.opendc.simulator.resources.consumer.SimTraceConsumer internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val switch = SimResourceSwitchMaxMin(scheduler) val sources = List(2) { SimResourceSource(2000.0, scheduler) } sources.forEach { switch.addInput(it) } - val provider = switch.addOutput(1000.0) + val provider = switch.newOutput() val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit @@ -64,27 +65,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L val workload = @@ -97,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(scheduler, listener) - val provider = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val provider = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -109,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest { } assertAll( - { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -121,27 +102,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L val workloadA = @@ -163,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(scheduler, listener) - val providerA = switch.addOutput(3200.0) - val providerB = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val providerA = switch.newOutput() + val providerB = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -180,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest { switch.close() } assertAll( - { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 880e1755..810052b8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceTransformer] class. @@ -41,7 +42,7 @@ internal class SimResourceTransformerTest { @Test fun testExitImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) launch { @@ -61,7 +62,7 @@ internal class SimResourceTransformerTest { @Test fun testExit() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) launch { @@ -122,7 +123,7 @@ internal class SimResourceTransformerTest { @Test fun testCancelStartedDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -141,7 +142,7 @@ internal class SimResourceTransformerTest { @Test fun testCancelPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -160,7 +161,7 @@ internal class SimResourceTransformerTest { @Test fun testExitPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -176,7 +177,7 @@ internal class SimResourceTransformerTest { @Test fun testAdjustCapacity() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -195,7 +196,7 @@ internal class SimResourceTransformerTest { @Test fun testTransformExit() = runBlockingSimulation { val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -205,4 +206,21 @@ internal class SimResourceTransformerTest { assertEquals(0, clock.millis()) verify(exactly = 1) { consumer.onNext(any()) } } + + @Test + fun testCounters() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) + + val consumer = SimWorkConsumer(2.0, 1.0) + source.startConsumer(forwarder) + + forwarder.consume(consumer) + + assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } + assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } + assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(2000, clock.millis()) + } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index ac8b5814..db4fe856 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimWorkConsumer] class. @@ -35,7 +36,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimWorkConsumerTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 1.0) @@ -50,7 +51,7 @@ internal class SimWorkConsumerTest { @Test fun testUtilization() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 0.5) |
