diff options
34 files changed, 627 insertions, 441 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index f08a7e1e..89016c55 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -112,7 +112,7 @@ public class SimHost( */ public val hypervisor: SimHypervisor = hypervisor.create( interpreter, - object : SimHypervisor.Listener { + listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, requestedWork: Long, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 893e8bcd..4b21b4f7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -114,7 +114,7 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(207112418947, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt index 2df307d3..2b84a17c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt @@ -166,13 +166,23 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource /** * The actual resource supporting the processing unit. */ - private val source = switch.addOutput(model.frequency) - - override val speed: Double = 0.0 /* TODO Implement */ + private val source = switch.newOutput() override val state: SimResourceState get() = source.state + override val capacity: Double + get() = source.capacity + + override val speed: Double + get() = source.speed + + override val demand: Double + get() = source.demand + + override val counters: SimResourceCounters + get() = source.counters + override fun startConsumer(consumer: SimResourceConsumer) { source.startConsumer(consumer) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index de2b3eef..d40cdac5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -37,8 +37,12 @@ import java.time.Clock * Abstract implementation of the [SimMachine] interface. * * @param interpreter The interpreter to manage the machine's resources. + * @param parent The parent simulation system. */ -public abstract class SimAbstractMachine(protected val interpreter: SimResourceInterpreter) : SimMachine, SimResourceSystem { +public abstract class SimAbstractMachine( + protected val interpreter: SimResourceInterpreter, + final override val parent: SimResourceSystem? +) : SimMachine, SimResourceSystem { private val _usage = MutableStateFlow(0.0) override val usage: StateFlow<Double> get() = _usage diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index f5218ba9..082719e2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -28,25 +28,27 @@ import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.resources.* import org.opendc.simulator.resources.SimResourceInterpreter -import kotlin.coroutines.* /** * A simulated bare-metal machine that is able to run a single workload. * * A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For - * example. the class expects only a single concurrent call to [run]. + * example. The class expects only a single concurrent call to [run]. * - * @param context The [CoroutineContext] to run the simulated workload in. - * @param clock The virtual clock to track the simulation time. + * @param interpreter The [SimResourceInterpreter] to drive the simulation. * @param model The machine model to simulate. + * @param scalingGovernor The CPU frequency scaling governor to use. + * @param scalingDriver The CPU frequency scaling driver to use. + * @param parent The parent simulation system. */ @OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) public class SimBareMetalMachine( - platform: SimResourceInterpreter, + interpreter: SimResourceInterpreter, override val model: SimMachineModel, scalingGovernor: ScalingGovernor, - scalingDriver: ScalingDriver -) : SimAbstractMachine(platform) { + scalingDriver: ScalingDriver, + parent: SimResourceSystem? = null, +) : SimAbstractMachine(interpreter, parent) { override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) } /** @@ -61,8 +63,6 @@ public class SimBareMetalMachine( scalingGovernor.createLogic(this.scalingDriver.createContext(cpu)) } - override val parent: SimResourceSystem? = null - init { scalingGovernors.forEach { it.onStart() } } @@ -89,11 +89,20 @@ public class SimBareMetalMachine( */ private val source = SimResourceSource(model.frequency, interpreter, this@SimBareMetalMachine) + override val state: SimResourceState + get() = source.state + + override val capacity: Double + get() = source.capacity + override val speed: Double get() = source.speed - override val state: SimResourceState - get() = source.state + override val demand: Double + get() = source.demand + + override val counters: SimResourceCounters + get() = source.counters override fun startConsumer(consumer: SimResourceConsumer) { source.startConsumer(consumer) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index 33e7e637..3ceb3291 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -26,33 +26,62 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.simulator.resources.SimResourceSwitch import org.opendc.simulator.resources.SimResourceSwitchMaxMin +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single * [SimBareMetalMachine] concurrently using weighted fair sharing. * + * @param interpreter The interpreter to manage the machine's resources. + * @param parent The parent simulation system. * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val interpreter: SimResourceInterpreter, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor(interpreter) { +public class SimFairShareHypervisor( + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null, + private val listener: SimHypervisor.Listener? = null +) : SimAbstractHypervisor(interpreter) { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { - return SimResourceSwitchMaxMin( - interpreter, null, - object : SimResourceSwitchMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } + return SwitchSystem().switch + } + + private inner class SwitchSystem : SimResourceSystem { + val switch = SimResourceSwitchMaxMin(interpreter, this) + + override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent + + private var lastCpuUsage = 0.0 + private var lastCpuDemand = 0.0 + private var lastDemand = 0.0 + private var lastActual = 0.0 + private var lastOvercommit = 0.0 + private var lastReport = Long.MIN_VALUE + + override fun onConverge(timestamp: Long) { + val listener = listener ?: return + val counters = switch.counters + + if (timestamp > lastReport) { + listener.onSliceFinish( + this@SimFairShareHypervisor, + (counters.demand - lastDemand).toLong(), + (counters.actual - lastActual).toLong(), + (counters.overcommit - lastOvercommit).toLong(), + 0L, + lastCpuUsage, + lastCpuDemand + ) } - ) + lastReport = timestamp + + lastCpuDemand = switch.inputs.sumOf { it.demand } + lastCpuUsage = switch.inputs.sumOf { it.speed } + lastDemand = counters.demand + lastActual = counters.actual + lastOvercommit = counters.overcommit + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt index 68858cc1..d3206196 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" - override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor { - return SimFairShareHypervisor(interpreter, listener) - } + override fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem?, + listener: SimHypervisor.Listener? + ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt index d0753084..8e8c3698 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A service provider interface for constructing a [SimHypervisor]. @@ -39,5 +40,9 @@ public interface SimHypervisorProvider { /** * Create a [SimHypervisor] instance with the specified [listener]. */ - public fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener? = null): SimHypervisor + public fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null, + listener: SimHypervisor.Listener? = null + ): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt index 13c7d9b2..136a543a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt @@ -33,9 +33,4 @@ public interface SimProcessingUnit : SimResourceProvider { * The model representing the static properties of the processing unit. */ public val model: ProcessingUnit - - /** - * The current speed of the processing unit. - */ - public val speed: Double } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index c017c8ab..923b5bab 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.resources.SimResourceSystem /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor { - return SimSpaceSharedHypervisor(interpreter) - } + override fun create( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem?, + listener: SimHypervisor.Listener? + ): SimHypervisor = SimSpaceSharedHypervisor(interpreter) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index c1b5089c..1709cc23 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -96,7 +96,7 @@ internal class SimHypervisorTest { val platform = SimResourceInterpreter(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, listener) + val hypervisor = SimFairShareHypervisor(platform, null, listener) launch { machine.run(hypervisor) @@ -171,7 +171,7 @@ internal class SimHypervisorTest { platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, listener) + val hypervisor = SimFairShareHypervisor(platform, null, listener) launch { machine.run(hypervisor) diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index 9233c72d..b45b2a2f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -90,7 +90,7 @@ class SimResourceBenchmarks { switch.addInput(SimResourceSource(3000.0, interpreter)) switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -105,7 +105,7 @@ class SimResourceBenchmarks { repeat(3) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } @@ -120,7 +120,7 @@ class SimResourceBenchmarks { switch.addInput(SimResourceSource(3000.0, interpreter)) switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -135,7 +135,7 @@ class SimResourceBenchmarks { repeat(2) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index be04d399..5fe7d7bb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -42,7 +42,7 @@ public abstract class SimAbstractResourceAggregator( /** * This method is invoked when the resource consumer finishes processing. */ - protected abstract fun doFinish(cause: Throwable?) + protected abstract fun doFinish() /** * This method is invoked when an input context is started. @@ -54,8 +54,9 @@ public abstract class SimAbstractResourceAggregator( */ protected abstract fun onInputFinished(input: Input) + /* SimResourceAggregator */ override fun addInput(input: SimResourceProvider) { - check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + check(state != SimResourceState.Stopped) { "Aggregator has been stopped" } val consumer = Consumer() _inputs.add(input) @@ -63,44 +64,77 @@ public abstract class SimAbstractResourceAggregator( input.startConsumer(consumer) } - override fun close() { - output.close() - } - - override val output: SimResourceProvider - get() = _output - private val _output = SimResourceForwarder() - override val inputs: Set<SimResourceProvider> get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() private val _inputConsumers = mutableListOf<Consumer>() - protected val outputContext: SimResourceContext - get() = context - private val context = interpreter.newContext( - _output, - object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - doIdle(deadline) - return Long.MAX_VALUE - } + /* SimResourceProvider */ + override val state: SimResourceState + get() = _output.state - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - doConsume(work, limit, deadline) - return Long.MAX_VALUE - } + override val capacity: Double + get() = _output.capacity - override fun onFinish(ctx: SimResourceControllableContext) { - doFinish(null) - } + override val speed: Double + get() = _output.speed + + override val demand: Double + get() = _output.demand + + override val counters: SimResourceCounters + get() = _output.counters + + override fun startConsumer(consumer: SimResourceConsumer) { + _output.startConsumer(consumer) + } + + override fun cancel() { + _output.cancel() + } + + override fun interrupt() { + _output.interrupt() + } + + override fun close() { + _output.close() + } + + private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { + override fun createLogic(): SimResourceProviderLogic { + return object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + doIdle(deadline) + return Long.MAX_VALUE + } + + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + doConsume(work, limit, deadline) + return Long.MAX_VALUE + } + + override fun onFinish(ctx: SimResourceControllableContext) { + doFinish() + } + + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } - override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return _inputConsumers.sumOf { it.remainingWork } + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return _inputConsumers.sumOf { it.remainingWork } + } } - }, - parent - ) + } + + /** + * Flush the progress of the output if possible. + */ + fun flush() { + ctx?.flush() + } + } /** * An input for the resource aggregator. @@ -141,7 +175,7 @@ public abstract class SimAbstractResourceAggregator( private fun updateCapacity() { // Adjust capacity of output resource - context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } + _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } } /* Input */ @@ -158,7 +192,7 @@ public abstract class SimAbstractResourceAggregator( this.command = null next } else { - context.flush() + _output.flush() next = command this.command = null @@ -172,11 +206,6 @@ public abstract class SimAbstractResourceAggregator( _ctx = ctx updateCapacity() - // Make sure we initialize the output if we have not done so yet - if (context.state == SimResourceState.Pending) { - context.start() - } - onInputStarted(this) } SimResourceEvent.Capacity -> updateCapacity() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index 519c2615..de26f99e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -22,27 +22,49 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + /** * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations. */ public abstract class SimAbstractResourceProvider( private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, + private val parent: SimResourceSystem?, initialCapacity: Double ) : SimResourceProvider { /** * The capacity of the resource. */ - public open var capacity: Double = initialCapacity - protected set(value) { + public override var capacity: Double = initialCapacity + set(value) { field = value ctx?.capacity = value } /** + * The current processing speed of the resource. + */ + public override val speed: Double + get() = ctx?.speed ?: 0.0 + + /** + * The resource processing speed demand at this instant. + */ + public override val demand: Double + get() = ctx?.demand ?: 0.0 + + /** + * The resource counters to track the execution metrics of the resource. + */ + public override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + + /** * The [SimResourceControllableContext] that is currently running. */ protected var ctx: SimResourceControllableContext? = null + private set /** * The state of the resource provider. @@ -62,6 +84,17 @@ public abstract class SimAbstractResourceProvider( ctx.start() } + /** + * Update the counters of the resource provider. + */ + protected fun updateCounters(ctx: SimResourceContext, work: Double) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += work + counters.actual += work - remainingWork + counters.overcommit += remainingWork + } + final override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } val ctx = interpreter.newContext(consumer, createLogic(), parent) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt index 5c0346cd..00972f43 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -25,12 +25,7 @@ package org.opendc.simulator.resources /** * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. */ -public interface SimResourceAggregator : AutoCloseable { - /** - * The output resource provider to which resource consumers can be attached. - */ - public val output: SimResourceProvider - +public interface SimResourceAggregator : SimResourceProvider { /** * The input resources that will be switched between the output providers. */ @@ -40,9 +35,4 @@ public interface SimResourceAggregator : AutoCloseable { * Add the specified [input] to the aggregator. */ public fun addInput(input: SimResourceProvider) - - /** - * End the lifecycle of the aggregator. - */ - public override fun close() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index bdab6def..c39c1aca 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -38,7 +38,7 @@ public class SimResourceAggregatorMaxMin( // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { val inputCapacity = input.ctx.capacity - val fraction = inputCapacity / outputContext.capacity + val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction val grantedWork = fraction * work @@ -56,7 +56,7 @@ public class SimResourceAggregatorMaxMin( } } - override fun doFinish(cause: Throwable?) { + override fun doFinish() { val iterator = consumers.iterator() for (input in iterator) { iterator.remove() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 7c76c634..0d9a6106 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -45,6 +45,11 @@ public interface SimResourceContext { public val speed: Double /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + + /** * The amount of work still remaining at this instant. */ public val remainingWork: Double diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt new file mode 100644 index 00000000..725aa5bc --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * An interface that tracks cumulative counts of the work performed by a resource. + */ +public interface SimResourceCounters { + /** + * The amount of work that resource consumers wanted the resource to perform. + */ + public val demand: Double + + /** + * The amount of work performed by the resource. + */ + public val actual: Double + + /** + * The amount of work that could not be completed due to overcommitted resources. + */ + public val overcommit: Double + + /** + * Reset the resource counters. + */ + public fun reset() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index 8dd1bd2b..e0333ff9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -25,19 +25,14 @@ package org.opendc.simulator.resources /** * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ -public interface SimResourceDistributor : AutoCloseable { +public interface SimResourceDistributor : SimResourceConsumer { /** * The output resource providers to which resource consumers can be attached. */ public val outputs: Set<SimResourceProvider> /** - * The input resource that will be distributed over the consumers. + * Create a new output for the distributor. */ - public val input: SimResourceProvider - - /** - * Create a new output for the distributor with the specified [capacity]. - */ - public fun addOutput(capacity: Double): SimResourceProvider + public fun newOutput(): SimResourceProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index e99f5eff..be9e89fb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -29,24 +29,22 @@ import kotlin.math.min * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. */ public class SimResourceDistributorMaxMin( - override val input: SimResourceProvider, private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, - private val listener: Listener? = null + private val parent: SimResourceSystem? = null ) : SimResourceDistributor { override val outputs: Set<SimResourceProvider> get() = _outputs private val _outputs = mutableSetOf<Output>() /** - * The active outputs. + * The resource context of the consumer. */ - private val activeOutputs: MutableList<Output> = mutableListOf() + private var ctx: SimResourceContext? = null /** - * The total speed requested by the output resources. + * The active outputs. */ - private var totalRequestedSpeed = 0.0 + private val activeOutputs: MutableList<Output> = mutableListOf() /** * The total amount of work requested by the output resources. @@ -58,145 +56,83 @@ public class SimResourceDistributorMaxMin( */ private var totalAllocatedSpeed = 0.0 - /** - * The total allocated work requested for the output resources. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * The timestamp of the last report. - */ - private var lastReport: Long = Long.MIN_VALUE - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private val consumer = object : SimResourceConsumer { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext - - val remainingWork: Double - get() = ctx.remainingWork - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx.capacity) - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - } - SimResourceEvent.Exit -> { - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() - - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() - - output.close() - } - } - else -> {} - } - } + /* SimResourceDistributor */ + override fun newOutput(): SimResourceProvider { + val provider = Output(ctx?.capacity ?: 0.0) + _outputs.add(provider) + return provider } - /** - * The total amount of remaining work. - */ - private val totalRemainingWork: Double - get() = consumer.remainingWork - - init { - input.startConsumer(consumer) + /* SimResourceConsumer */ + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) } - override fun addOutput(capacity: Double): SimResourceProvider { - check(!isClosed) { "Distributor has been closed" } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + updateCapacity(ctx) + } + SimResourceEvent.Exit -> { + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() - val provider = Output(capacity) - _outputs.add(provider) - return provider - } + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() - override fun close() { - if (!isClosed) { - isClosed = true - input.cancel() + output.close() + } + } + SimResourceEvent.Capacity -> updateCapacity(ctx) + else -> {} } } /** - * Schedule the work over the physical CPUs. + * Schedule the work of the outputs. */ - private fun doSchedule(capacity: Double): SimResourceCommand { - // If there is no work yet, mark all inputs as idle. + private fun doNext(capacity: Double): SimResourceCommand { + // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { return SimResourceCommand.Idle() } - val maxUsage = capacity var duration: Double = Double.MAX_VALUE var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage + var availableSpeed = capacity var totalRequestedSpeed = 0.0 var totalRequestedWork = 0.0 - // Flush the work of the outputs - var outputIterator = activeOutputs.listIterator() - while (outputIterator.hasNext()) { - val output = outputIterator.next() - + // Pull in the work of the outputs + val outputIterator = activeOutputs.listIterator() + for (output in outputIterator) { output.pull() + // Remove outputs that have finished if (output.isFinished) { - // The output consumer has exited, so remove it from the scheduling queue. outputIterator.remove() } } - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + // Sort in-place the outputs based on their requested usage. + // Profiling shows that it is faster than maintaining some kind of sorted set. activeOutputs.sort() // Divide the available input capacity fairly across the outputs using max-min fair sharing - outputIterator = activeOutputs.listIterator() var remaining = activeOutputs.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() + for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- when (val command = output.activeCommand) { is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) - output.actualSpeed = 0.0 } is SimResourceCommand.Consume -> { val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) // Ignore idle computation @@ -211,7 +147,7 @@ public class SimResourceDistributorMaxMin( output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed - // The duration that we want to run is that of the shortest request from an output + // The duration that we want to run is that of the shortest request of an output duration = min(duration, command.work / grantedSpeed) } SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } @@ -220,67 +156,23 @@ public class SimResourceDistributorMaxMin( assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } - this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = maxUsage - availableSpeed - this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)) + this.totalAllocatedSpeed = capacity - availableSpeed + val totalAllocatedWork = min( + totalRequestedWork, + totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration) + ) return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline) else SimResourceCommand.Idle(deadline) } - /** - * Obtain the next command to perform. - */ - private fun doNext(capacity: Double): SimResourceCommand { - val totalRequestedWork = totalRequestedWork.toLong() - val totalAllocatedWork = totalAllocatedWork.toLong() - val totalRemainingWork = totalRemainingWork.toLong() - val totalRequestedSpeed = totalRequestedSpeed - val totalAllocatedSpeed = totalAllocatedSpeed - - // Force all inputs to re-schedule their work. - val command = doSchedule(capacity) - - val now = interpreter.clock.millis() - if (lastReport < now) { - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork, - totalAllocatedWork - totalRemainingWork, - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalAllocatedSpeed, - totalRequestedSpeed - ) - lastReport = now + private fun updateCapacity(ctx: SimResourceContext) { + for (output in _outputs) { + output.capacity = ctx.capacity } - - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - return command - } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) } /** @@ -322,7 +214,7 @@ public class SimResourceDistributorMaxMin( interpreter.batch { ctx.start() // Interrupt the input to re-schedule the resources - input.interrupt() + this@SimResourceDistributorMaxMin.ctx?.interrupt() } } @@ -338,8 +230,6 @@ public class SimResourceDistributorMaxMin( /* SimResourceProviderLogic */ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - reportOvercommit(ctx.remainingWork) - allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) lastCommandTimestamp = ctx.clock.millis() @@ -348,8 +238,6 @@ public class SimResourceDistributorMaxMin( } override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - reportOvercommit(ctx.remainingWork) - allowedSpeed = ctx.speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) lastCommandTimestamp = ctx.clock.millis() @@ -357,31 +245,25 @@ public class SimResourceDistributorMaxMin( return Long.MAX_VALUE } - override fun onFinish(ctx: SimResourceControllableContext) { - reportOvercommit(ctx.remainingWork) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + override fun onFinish(ctx: SimResourceControllableContext) { activeCommand = SimResourceCommand.Exit lastCommandTimestamp = ctx.clock.millis() } override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 + val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - // Compute the remaining amount of work return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM + // Compute the fraction of compute time allocated to the output val fraction = actualSpeed / totalAllocatedSpeed - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) + // Compute the work that was actually granted to the output. + val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction + max(0.0, work - processingAvailable) } else { 0.0 } @@ -399,9 +281,5 @@ public class SimResourceDistributorMaxMin( ctx.flush() } } - - private fun reportOvercommit(remainingWork: Double) { - totalOvercommittedWork += remainingWork - } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 2f567a5e..f709ca17 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -27,7 +27,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException /** - * A [SimResourceProvider] provides some resource of type [R]. + * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. */ public interface SimResourceProvider : AutoCloseable { /** @@ -36,6 +36,26 @@ public interface SimResourceProvider : AutoCloseable { public val state: SimResourceState /** + * The resource capacity available at this instant. + */ + public val capacity: Double + + /** + * The current processing speed of the resource. + */ + public val speed: Double + + /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + + /** + * The resource counters to track the execution metrics of the resource. + */ + public val counters: SimResourceCounters + + /** * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. * * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 22676984..5231ecf5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -29,7 +29,7 @@ import kotlin.math.max */ public interface SimResourceProviderLogic { /** - * This method is invoked when the resource will idle until the specified [deadline]. + * This method is invoked when the resource is reported to idle until the specified [deadline]. * * @param ctx The context in which the provider runs. * @param deadline The deadline that was requested by the resource consumer. @@ -38,8 +38,8 @@ public interface SimResourceProviderLogic { public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long /** - * This method is invoked when the resource will be consumed until the specified [work] was processed or the - * [deadline] was reached. + * This method is invoked when the resource will be consumed until the specified amount of [work] was processed + * or [deadline] is reached. * * @param ctx The context in which the provider runs. * @param work The amount of work that was requested by the resource consumer. @@ -50,6 +50,14 @@ public interface SimResourceProviderLogic { public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long /** + * This method is invoked when the progress of the resource consumer is materialized. + * + * @param ctx The context in which the provider runs. + * @param work The amount of work that was requested by the resource consumer. + */ + public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {} + + /** * This method is invoked when the resource consumer has finished. */ public fun onFinish(ctx: SimResourceControllableContext) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index d984d2a5..9f062cc3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -37,21 +37,6 @@ public class SimResourceSource( private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null ) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { - /** - * The current processing speed of the resource. - */ - public val speed: Double - get() = ctx?.speed ?: 0.0 - - /** - * The capacity of the resource. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { @@ -62,6 +47,10 @@ public class SimResourceSource( return min(deadline, ctx.clock.millis() + getDuration(work, speed)) } + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + override fun onFinish(ctx: SimResourceControllableContext) { cancel() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index 53fec16a..e224285e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable { public val inputs: Set<SimResourceProvider> /** - * Add an output to the switch with the specified [capacity]. + * The resource counters to track the execution metrics of all switch resources. */ - public fun addOutput(capacity: Double): SimResourceProvider + public val counters: SimResourceCounters + + /** + * Create a new output on the switch. + */ + public fun newOutput(): SimResourceProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 7f1bb2b7..2950af80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override val inputs: Set<SimResourceProvider> get() = _inputs - override fun addOutput(capacity: Double): SimResourceProvider { + override val counters: SimResourceCounters = object : SimResourceCounters { + override val demand: Double + get() = _inputs.sumOf { it.counters.demand } + override val actual: Double + get() = _inputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _inputs.sumOf { it.counters.overcommit } + + override fun reset() { + for (input in _inputs) { + input.counters.reset() + } + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(capacity, forwarder) + val output = Provider(forwarder) _outputs += output return output } @@ -84,10 +101,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { _inputs.forEach(SimResourceProvider::cancel) } - private inner class Provider( - private val capacity: Double, - private val forwarder: SimResourceTransformer - ) : SimResourceProvider by forwarder { + private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { override fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. _outputs -= this diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 61887e34..684a1b52 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -28,16 +28,25 @@ package org.opendc.simulator.resources */ public class SimResourceSwitchMaxMin( interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null, - private val listener: Listener? = null + parent: SimResourceSystem? = null ) : SimResourceSwitch { - private val _outputs = mutableSetOf<SimResourceProvider>() + /** + * The output resource providers to which resource consumers can be attached. + */ override val outputs: Set<SimResourceProvider> - get() = _outputs + get() = distributor.outputs - private val _inputs = mutableSetOf<SimResourceProvider>() + /** + * The input resources that will be switched between the output providers. + */ override val inputs: Set<SimResourceProvider> - get() = _inputs + get() = aggregator.inputs + + /** + * The resource counters to track the execution metrics of all switch resources. + */ + override val counters: SimResourceCounters + get() = aggregator.counters /** * A flag to indicate that the switch was closed. @@ -52,32 +61,19 @@ public class SimResourceSwitchMaxMin( /** * The distributor to distribute the aggregated resources. */ - private val distributor = SimResourceDistributorMaxMin( - aggregator.output, interpreter, parent, - object : SimResourceDistributorMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } - } - ) + private val distributor = SimResourceDistributorMaxMin(interpreter, parent) + + init { + aggregator.startConsumer(distributor) + } /** - * Add an output to the switch with the specified [capacity]. + * Add an output to the switch. */ - override fun addOutput(capacity: Double): SimResourceProvider { + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } - val provider = distributor.addOutput(capacity) - _outputs.add(provider) - return provider + return distributor.newOutput() } /** @@ -92,26 +88,7 @@ public class SimResourceSwitchMaxMin( override fun close() { if (!isClosed) { isClosed = true - distributor.close() aggregator.close() } } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 32f3f573..fd3d1230 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + /** * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider. * @@ -53,6 +55,19 @@ public class SimResourceTransformer( override var state: SimResourceState = SimResourceState.Pending private set + override val capacity: Double + get() = ctx?.capacity ?: 0.0 + + override val speed: Double + get() = ctx?.speed ?: 0.0 + + override val demand: Double + get() = ctx?.demand ?: 0.0 + + override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } @@ -97,10 +112,15 @@ public class SimResourceTransformer( start() } + updateCounters(ctx) + return if (state == SimResourceState.Stopped) { SimResourceCommand.Exit } else if (delegate != null) { val command = transform(ctx, delegate.onNext(ctx)) + + _work = if (command is SimResourceCommand.Consume) command.work else 0.0 + if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards @@ -169,6 +189,22 @@ public class SimResourceTransformer( state = SimResourceState.Pending } } + + /** + * Counter to track the current submitted work. + */ + private var _work = 0.0 + + /** + * Update the resource counters for the transformer. + */ + private fun updateCounters(ctx: SimResourceContext) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += _work + counters.actual += _work - remainingWork + counters.overcommit += remainingWork + } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 0b3f5de1..46c5c63f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -87,6 +87,26 @@ internal class SimResourceContextImpl( private var _speed = 0.0 /** + * The current resource processing demand. + */ + override val demand: Double + get() = _limit + + private val counters = object : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + /** * The current state of the resource context. */ private var _timestamp: Long = Long.MIN_VALUE @@ -145,7 +165,7 @@ internal class SimResourceContextImpl( return } - doUpdate(clock.millis()) + interpreter.scheduleSync(this) } /** @@ -206,6 +226,11 @@ internal class SimResourceContextImpl( val remainingWork = remainingWork val isConsume = _limit > 0.0 + // Update the resource counters only if there is some progress + if (timestamp > _timestamp) { + logic.onUpdate(this, _work) + } + // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource capacity cannot satisfy the demand. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt new file mode 100644 index 00000000..827019c5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import org.opendc.simulator.resources.SimResourceCounters + +/** + * Mutable implementation of the [SimResourceCounters] interface. + */ +internal class SimResourceCountersImpl : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt index d09e1b45..cb0d6160 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -61,6 +61,11 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, private val futureInvocations = ArrayDeque<Invocation>() /** + * The systems that have been visited during the interpreter cycle. + */ + private val visited = linkedSetOf<SimResourceSystem>() + + /** * The index in the batch stack. */ private var batchIndex = 0 @@ -95,6 +100,30 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** + * Update the specified [ctx] synchronously. + */ + fun scheduleSync(ctx: SimResourceContextImpl) { + ctx.doUpdate(clock.millis()) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) + } + + // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked + // up by the active interpreter. + if (isRunning) { + return + } + + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + } + + /** * Schedule the interpreter to run at [timestamp] to update the resource contexts. * * This method will override earlier calls to this method for the same [ctx]. @@ -148,7 +177,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, val queue = queue val futureQueue = futureQueue val futureInvocations = futureInvocations - val visited = linkedSetOf<SimResourceSystem>() + val visited = visited // Execute all scheduled updates at current timestamp while (true) { @@ -183,6 +212,8 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, for (system in visited) { system.onConverge(now) } + + visited.clear() } while (queue.isNotEmpty()) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 994ae888..51024e80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -59,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest { source.startConsumer(adapter) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertAll( @@ -67,7 +67,7 @@ internal class SimResourceAggregatorMaxMinTest { { assertEquals(listOf(0.0, 0.5, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @@ -87,14 +87,14 @@ internal class SimResourceAggregatorMaxMinTest { val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(adapter) + aggregator.consume(adapter) yield() assertAll( { assertEquals(1000, clock.millis()) }, { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @@ -115,13 +115,13 @@ internal class SimResourceAggregatorMaxMinTest { .andThen(SimResourceCommand.Exit) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertEquals(1000, clock.millis()) verify(exactly = 2) { consumer.onNext(any()) } } finally { - aggregator.output.close() + aggregator.close() } } @@ -142,11 +142,11 @@ internal class SimResourceAggregatorMaxMinTest { .andThenThrows(IllegalStateException("Test Exception")) try { - assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } + assertThrows<IllegalStateException> { aggregator.consume(consumer) } yield() assertEquals(SimResourceState.Pending, sources[0].state) } finally { - aggregator.output.close() + aggregator.close() } } @@ -164,14 +164,14 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(4.0, 1.0) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(1000) sources[0].capacity = 0.5 } yield() assertEquals(2334, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() } } @@ -189,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(1.0, 0.5) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(500) sources[0].capacity = 0.5 } yield() assertEquals(1000, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() + } + } + + @Test + fun testCounters() = runBlockingSimulation { + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(scheduler) + val sources = listOf( + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) + assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } + } finally { + aggregator.close() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 517dcb36..ad8d82e3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -67,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest { source.startConsumer(adapter) switch.addInput(forwarder) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -98,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -142,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -170,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - switch.addOutput(3200.0) - assertThrows<IllegalStateException> { switch.addOutput(3200.0) } + switch.newOutput() + assertThrows<IllegalStateException> { switch.newOutput() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 0b023878..e4292ec0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -47,7 +47,7 @@ internal class SimResourceSwitchMaxMinTest { val sources = List(2) { SimResourceSource(2000.0, scheduler) } sources.forEach { switch.addInput(it) } - val provider = switch.addOutput(1000.0) + val provider = switch.newOutput() val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit @@ -67,26 +67,6 @@ internal class SimResourceSwitchMaxMinTest { fun testOvercommittedSingle() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - val duration = 5 * 60L val workload = SimTraceConsumer( @@ -98,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(scheduler, null, listener) - val provider = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val provider = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -110,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest { } assertAll( - { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -124,26 +104,6 @@ internal class SimResourceSwitchMaxMinTest { fun testOvercommittedDual() = runBlockingSimulation { val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - val duration = 5 * 60L val workloadA = SimTraceConsumer( @@ -164,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(scheduler, null, listener) - val providerA = switch.addOutput(3200.0) - val providerB = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val providerA = switch.newOutput() + val providerB = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -181,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest { switch.close() } assertAll( - { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 04886399..810052b8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -206,4 +206,21 @@ internal class SimResourceTransformerTest { assertEquals(0, clock.millis()) verify(exactly = 1) { consumer.onNext(any()) } } + + @Test + fun testCounters() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) + + val consumer = SimWorkConsumer(2.0, 1.0) + source.startConsumer(forwarder) + + forwarder.consume(consumer) + + assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } + assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } + assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(2000, clock.millis()) + } } |
