diff options
31 files changed, 704 insertions, 505 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index f499927d..4b96872b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -50,7 +50,6 @@ import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext -import kotlin.math.roundToLong /** * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. @@ -63,7 +62,7 @@ public class SimHost( context: CoroutineContext, engine: FlowEngine, meterProvider: MeterProvider, - hypervisor: SimHypervisorProvider, + hypervisorProvider: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), @@ -103,29 +102,8 @@ public class SimHost( /** * The hypervisor to run multiple workloads. */ - private val hypervisor: SimHypervisor = hypervisor.create( - engine, - scalingGovernor = scalingGovernor, - interferenceDomain = interferenceDomain, - listener = object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double - ) { - _cpuDemand = cpuDemand - _cpuUsage = cpuUsage - - collectTime() - } - } - ) - private var _cpuUsage = 0.0 - private var _cpuDemand = 0.0 + private val hypervisor: SimHypervisor = hypervisorProvider + .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain) /** * The virtual machines running on the hypervisor. @@ -157,22 +135,13 @@ public class SimHost( } } + /** + * The [Job] that represents the machine running the hypervisor. + */ + private var _job: Job? = null + init { - // Launch hypervisor onto machine - scope.launch { - try { - _bootTime = clock.millis() - _state = HostState.UP - machine.run(this@SimHost.hypervisor, emptyMap()) - } catch (_: CancellationException) { - // Ignored - } catch (cause: Throwable) { - logger.error(cause) { "Host failed" } - throw cause - } finally { - _state = HostState.DOWN - } - } + launch() meter.upDownCounterBuilder("system.guests") .setDescription("Number of guests on this host") @@ -184,15 +153,15 @@ public class SimHost( meter.gaugeBuilder("system.cpu.demand") .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") .setUnit("MHz") - .buildWithCallback { result -> result.observe(_cpuDemand) } + .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) } meter.gaugeBuilder("system.cpu.usage") .setDescription("Amount of CPU resources used by the host") .setUnit("MHz") - .buildWithCallback { result -> result.observe(_cpuUsage) } + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) } meter.gaugeBuilder("system.cpu.utilization") .setDescription("Utilization of the CPU resources of the host") .setUnit("%") - .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) } + .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) } meter.counterBuilder("system.cpu.time") .setDescription("Amount of CPU time spent by the host") .setUnit("s") @@ -200,16 +169,16 @@ public class SimHost( meter.gaugeBuilder("system.power.usage") .setDescription("Power usage of the host ") .setUnit("W") - .buildWithCallback { result -> result.observe(machine.powerDraw) } + .buildWithCallback { result -> result.observe(machine.powerUsage) } meter.counterBuilder("system.power.total") .setDescription("Amount of energy used by the CPU") .setUnit("J") .ofDoubles() - .buildWithCallback(::collectPowerTotal) + .buildWithCallback { result -> result.observe(machine.energyUsage) } meter.counterBuilder("system.time") .setDescription("The uptime of the host") .setUnit("s") - .buildWithCallback(::collectTime) + .buildWithCallback(::collectUptime) meter.gaugeBuilder("system.time.boot") .setDescription("The boot time of the host") .setUnit("1") @@ -290,9 +259,12 @@ public class SimHost( } public suspend fun recover() { - collectTime() - _state = HostState.UP - _bootTime = clock.millis() + updateUptime() + + launch() + + // Wait for the hypervisor to launch before recovering the guests + yield() for (guest in guests.values) { guest.recover() @@ -300,14 +272,42 @@ public class SimHost( } /** + * Launch the hypervisor. + */ + private fun launch() { + check(_job == null) { "Concurrent hypervisor running" } + + // Launch hypervisor onto machine + _job = scope.launch { + try { + _bootTime = clock.millis() + _state = HostState.UP + machine.run(hypervisor, emptyMap()) + } catch (_: CancellationException) { + // Ignored + } catch (cause: Throwable) { + logger.error(cause) { "Host failed" } + throw cause + } finally { + _state = HostState.DOWN + } + } + } + + /** * Reset the machine. */ private fun reset() { - collectTime() + updateUptime() + + // Stop the hypervisor + val job = _job + if (job != null) { + job.cancel() + _job = null + } _state = HostState.DOWN - _cpuUsage = 0.0 - _cpuDemand = 0.0 } /** @@ -385,85 +385,46 @@ public class SimHost( } } - private var _lastCpuTimeCallback = clock.millis() - - /** - * Helper function to track the CPU time of a machine. - */ - private fun collectCpuTime(result: ObservableLongMeasurement) { - val now = clock.millis() - val duration = now - _lastCpuTimeCallback - - try { - collectCpuTime(duration, result) - } finally { - _lastCpuTimeCallback = now - } - } - private val _activeState = Attributes.of(STATE_KEY, "active") private val _stealState = Attributes.of(STATE_KEY, "steal") private val _lostState = Attributes.of(STATE_KEY, "lost") private val _idleState = Attributes.of(STATE_KEY, "idle") - private var _totalTime = 0.0 /** * Helper function to track the CPU time of a machine. */ - private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { - val coreCount = this.model.cpuCount - val d = coreCount / _cpuLimit - + private fun collectCpuTime(result: ObservableLongMeasurement) { val counters = hypervisor.counters - val grantedWork = counters.actual - val overcommittedWork = counters.overcommit - val interferedWork = counters.interference - _totalTime += (duration / 1000.0) * coreCount - val activeTime = (grantedWork * d).roundToLong() - val idleTime = (_totalTime - grantedWork * d).roundToLong() - val stealTime = (overcommittedWork * d).roundToLong() - val lostTime = (interferedWork * d).roundToLong() - - result.observe(activeTime, _activeState) - result.observe(idleTime, _idleState) - result.observe(stealTime, _stealState) - result.observe(lostTime, _lostState) + result.observe(counters.cpuActiveTime / 1000L, _activeState) + result.observe(counters.cpuIdleTime / 1000L, _idleState) + result.observe(counters.cpuStealTime / 1000L, _stealState) + result.observe(counters.cpuLostTime / 1000L, _lostState) for (guest in guests.values) { - guest.collectCpuTime(duration, result) + guest.collectCpuTime(result) } } - private var _lastPowerCallback = clock.millis() - private var _totalPower = 0.0 - - /** - * Helper function to collect the total power usage of the machine. - */ - private fun collectPowerTotal(result: ObservableDoubleMeasurement) { - val now = clock.millis() - val duration = now - _lastPowerCallback - - _totalPower += duration / 1000.0 * machine.powerDraw - result.observe(_totalPower) - - _lastPowerCallback = now - } - private var _lastReport = clock.millis() /** * Helper function to track the uptime of a machine. */ - private fun collectTime(result: ObservableLongMeasurement? = null) { + private fun updateUptime() { val now = clock.millis() val duration = now - _lastReport + _lastReport = now - try { - collectTime(duration, result) - } finally { - _lastReport = now + if (_state == HostState.UP) { + _uptime += duration + } else if (_state == HostState.DOWN && scope.isActive) { + // Only increment downtime if the machine is in a failure state + _downtime += duration + } + + for (guest in guests.values) { + guest.updateUptime(duration) } } @@ -475,19 +436,14 @@ public class SimHost( /** * Helper function to track the uptime of a machine. */ - private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) { - if (state == HostState.UP) { - _uptime += duration - } else if (state == HostState.DOWN && scope.isActive) { - // Only increment downtime if the machine is in a failure state - _downtime += duration - } + private fun collectUptime(result: ObservableLongMeasurement) { + updateUptime() - result?.observe(_uptime, _upState) - result?.observe(_downtime, _downState) + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) for (guest in guests.values) { - guest.collectUptime(duration, result) + guest.collectUptime(result) } } @@ -496,9 +452,9 @@ public class SimHost( /** * Helper function to track the boot time of a machine. */ - private fun collectBootTime(result: ObservableLongMeasurement? = null) { + private fun collectBootTime(result: ObservableLongMeasurement) { if (_bootTime != Long.MIN_VALUE) { - result?.observe(_bootTime) + result.observe(_bootTime) } for (guest in guests.values) { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index eda76ba0..3ac165c8 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -33,12 +33,10 @@ import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimWorkloadMapper -import org.opendc.simulator.compute.SimAbstractMachine -import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.kernel.SimVirtualMachine import org.opendc.simulator.compute.workload.SimWorkload import java.time.Clock import kotlin.coroutines.CoroutineContext -import kotlin.math.roundToLong /** * A virtual machine instance that is managed by a [SimHost]. @@ -50,7 +48,7 @@ internal class Guest( private val mapper: SimWorkloadMapper, private val listener: GuestListener, val server: Server, - val machine: SimMachine + val machine: SimVirtualMachine ) { /** * The [CoroutineScope] of the guest. @@ -236,17 +234,22 @@ internal class Guest( .build() /** - * Helper function to track the uptime of the guest. + * Helper function to track the uptime and downtime of the guest. */ - fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) { + fun updateUptime(duration: Long) { if (state == ServerState.RUNNING) { _uptime += duration } else if (state == ServerState.ERROR) { _downtime += duration } + } - result?.observe(_uptime, _upState) - result?.observe(_downtime, _downState) + /** + * Helper function to track the uptime of the guest. + */ + fun collectUptime(result: ObservableLongMeasurement) { + result.observe(_uptime, _upState) + result.observe(_downtime, _downState) } private var _bootTime = Long.MIN_VALUE @@ -254,9 +257,9 @@ internal class Guest( /** * Helper function to track the boot time of the guest. */ - fun collectBootTime(result: ObservableLongMeasurement? = null) { + fun collectBootTime(result: ObservableLongMeasurement) { if (_bootTime != Long.MIN_VALUE) { - result?.observe(_bootTime) + result.observe(_bootTime) } } @@ -276,33 +279,17 @@ internal class Guest( .putAll(attributes) .put(STATE_KEY, "idle") .build() - private var _totalTime = 0.0 /** * Helper function to track the CPU time of a machine. */ - fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) { - val coreCount = server.flavor.cpuCount - val d = coreCount / _cpuLimit - - var grantedWork = 0.0 - var overcommittedWork = 0.0 - - for (cpu in (machine as SimAbstractMachine).cpus) { - val counters = cpu.counters - grantedWork += counters.actual - overcommittedWork += counters.overcommit - } - - _totalTime += (duration / 1000.0) * coreCount - val activeTime = (grantedWork * d).roundToLong() - val idleTime = (_totalTime - grantedWork * d).roundToLong() - val stealTime = (overcommittedWork * d).roundToLong() + fun collectCpuTime(result: ObservableLongMeasurement) { + val counters = machine.counters - result.observe(activeTime, _activeState) - result.observe(idleTime, _idleState) - result.observe(stealTime, _stealState) - result.observe(0, _lostState) + result.observe(counters.cpuActiveTime / 1000, _activeState) + result.observe(counters.cpuIdleTime / 1000, _idleState) + result.observe(counters.cpuStealTime / 1000, _stealState) + result.observe(counters.cpuLostTime / 1000, _lostState) } private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index d2293be7..26089b6d 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -170,9 +170,9 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(659, activeTime, "Active time does not match") }, - { assertEquals(2342, idleTime, "Idle time does not match") }, - { assertEquals(638, stealTime, "Steal time does not match") }, + { assertEquals(658, activeTime, "Active time does not match") }, + { assertEquals(1741, idleTime, "Idle time does not match") }, + { assertEquals(637, stealTime, "Steal time does not match") }, { assertEquals(1500001, clock.millis()) } ) } @@ -253,7 +253,7 @@ internal class SimHostTest { host.spawn(server) delay(5000L) host.fail() - delay(5000L) + delay(duration * 1000) host.recover() suspendCancellableCoroutine<Unit> { cont -> @@ -274,12 +274,12 @@ internal class SimHostTest { reader.close() assertAll( - { assertEquals(2661, idleTime, "Idle time does not match") }, - { assertEquals(339, activeTime, "Active time does not match") }, - { assertEquals(1195001, uptime, "Uptime does not match") }, - { assertEquals(5000, downtime, "Downtime does not match") }, - { assertEquals(1195000, guestUptime, "Guest uptime does not match") }, - { assertEquals(5000, guestDowntime, "Guest downtime does not match") }, + { assertEquals(1175, idleTime, "Idle time does not match") }, + { assertEquals(624, activeTime, "Active time does not match") }, + { assertEquals(900001, uptime, "Uptime does not match") }, + { assertEquals(300000, downtime, "Downtime does not match") }, + { assertEquals(900000, guestUptime, "Guest uptime does not match") }, + { assertEquals(300000, guestDowntime, "Guest downtime does not match") }, ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 67d39ffa..9d540118 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -116,11 +116,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223327751, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(67009849, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3155964, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.840207707767459E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -160,10 +160,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10998184, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740216, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.009945802750012E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -209,9 +210,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6009751, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(14728649, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12526520, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(480866, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -252,9 +253,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(11133606, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9604794, exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(1311, exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } ) diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index c57919c1..d654d58a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -92,7 +92,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine) + val hypervisor = SimSpaceSharedHypervisor(engine, null, null) launch { machine.run(hypervisor) } @@ -113,7 +113,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine) + val hypervisor = SimFairShareHypervisor(engine, null, null, null) launch { machine.run(hypervisor) } @@ -134,7 +134,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine) + val hypervisor = SimFairShareHypervisor(engine, null, null, null) launch { machine.run(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 0bcf5957..9140d31b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -28,6 +28,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.flow.* import org.opendc.simulator.flow.FlowEngine +import kotlin.math.max /** * A simulated bare-metal machine that is able to run a single workload. @@ -49,10 +50,18 @@ public class SimBareMetalMachine( parent: FlowConvergenceListener? = null, ) : SimAbstractMachine(engine, parent, model) { /** - * The power draw of the machine onto the PSU. + * The current power usage of the machine (without PSU loss) in W. */ - public val powerDraw: Double - get() = powerDriverLogic.computePower() + public val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + /** + * The total energy usage of the machine (without PSU loss) in Joules. + */ + public val energyUsage: Double + get() = _energyUsage + private var _energyUsage = 0.0 /** * The processing units of the machine. @@ -66,8 +75,20 @@ public class SimBareMetalMachine( */ private val powerDriverLogic = powerDriver.createLogic(this, cpus) + private var _lastConverge = Long.MAX_VALUE + override fun onConverge(now: Long, delta: Long) { + // Update the PSU stage psu.update() + + val lastConverge = _lastConverge + _lastConverge = now + val duration = max(0, now - lastConverge) + if (duration > 0) { + // Compute the power and energy usage of the machine + _energyUsage += _powerUsage * (duration / 1000.0) + _powerUsage = powerDriverLogic.computePower() + } } init { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index bcba8e8e..aac8b959 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -30,6 +30,7 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.flow.* import org.opendc.simulator.flow.mux.FlowMultiplexer +import kotlin.math.roundToLong /** * Abstract implementation of the [SimHypervisor] interface. @@ -39,18 +40,19 @@ import org.opendc.simulator.flow.mux.FlowMultiplexer */ public abstract class SimAbstractHypervisor( protected val engine: FlowEngine, - private val scalingGovernor: ScalingGovernor? = null, + private val listener: FlowConvergenceListener?, + private val scalingGovernor: ScalingGovernor?, protected val interferenceDomain: VmInterferenceDomain? = null -) : SimHypervisor { +) : SimHypervisor, FlowConvergenceListener { /** * The machine on which the hypervisor runs. */ - private lateinit var context: SimMachineContext + protected lateinit var context: SimMachineContext /** * The resource switch to use. */ - private lateinit var mux: FlowMultiplexer + protected abstract val mux: FlowMultiplexer /** * The virtual machines running on this hypervisor. @@ -62,39 +64,73 @@ public abstract class SimAbstractHypervisor( /** * The resource counters associated with the hypervisor. */ - public override val counters: FlowCounters - get() = mux.counters + public override val counters: SimHypervisorCounters + get() = _counters + private val _counters = object : SimHypervisorCounters { + @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity + + override var cpuActiveTime: Long = 0L + override var cpuIdleTime: Long = 0L + override var cpuStealTime: Long = 0L + override var cpuLostTime: Long = 0L + + private var _previousDemand = 0.0 + private var _previousActual = 0.0 + private var _previousRemaining = 0.0 + private var _previousInterference = 0.0 + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val counters = mux.counters + val demand = counters.demand + val actual = counters.actual + val remaining = counters.remaining + val interference = counters.interference + + val demandDelta = demand - _previousDemand + val actualDelta = actual - _previousActual + val remainingDelta = remaining - _previousRemaining + val interferenceDelta = interference - _previousInterference + + _previousDemand = demand + _previousActual = actual + _previousRemaining = remaining + _previousInterference = interference + + cpuActiveTime += (actualDelta * d).roundToLong() + cpuIdleTime += (remainingDelta * d).roundToLong() + cpuStealTime += ((demandDelta - actualDelta) * d).roundToLong() + cpuLostTime += (interferenceDelta * d).roundToLong() + } + } /** - * The scaling governors attached to the physical CPUs backing this hypervisor. + * The CPU capacity of the hypervisor in MHz. */ - private val governors = mutableListOf<ScalingGovernor.Logic>() + override val cpuCapacity: Double + get() = mux.capacity /** - * Construct the [FlowMultiplexer] implementation that performs the actual scheduling of the CPUs. + * The CPU demand of the hypervisor in MHz. */ - public abstract fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer + override val cpuDemand: Double + get() = mux.demand /** - * Check whether the specified machine model fits on this hypervisor. + * The CPU usage of the hypervisor in MHz. */ - public abstract fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean + override val cpuUsage: Double + get() = mux.rate /** - * Trigger the governors to recompute the scaling limits. + * The scaling governors attached to the physical CPUs backing this hypervisor. */ - protected fun triggerGovernors(load: Double) { - for (governor in governors) { - governor.onLimit(load) - } - } + private val governors = mutableListOf<ScalingGovernor.Logic>() /* SimHypervisor */ - override fun canFit(model: MachineModel): Boolean { - return canFit(model, mux) - } - - override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine { + override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } val vm = VirtualMachine(model, interferenceId) _vms.add(vm) @@ -104,7 +140,13 @@ public abstract class SimAbstractHypervisor( /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx - mux = createMultiplexer(ctx) + + _cpuCount = ctx.cpus.size + _cpuCapacity = ctx.cpus.sumOf { it.model.frequency } + _counters.d = _cpuCount / _cpuCapacity * 1000L + + // Clear the existing outputs of the multiplexer + mux.clearOutputs() for (cpu in ctx.cpus) { val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) @@ -113,16 +155,31 @@ public abstract class SimAbstractHypervisor( governor.onStart() } - mux.addOutput(cpu) + cpu.startConsumer(mux.newOutput()) } } + private var _cpuCount = 0 + private var _cpuCapacity = 0.0 + + /* FlowConvergenceListener */ + override fun onConverge(now: Long, delta: Long) { + _counters.record() + + val load = cpuDemand / cpuCapacity + for (governor in governors) { + governor.onLimit(load) + } + + listener?.onConverge(now, delta) + } + /** * A virtual machine running on the hypervisor. * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model) { + private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { /** * The interference key of this virtual machine. */ @@ -133,6 +190,41 @@ public abstract class SimAbstractHypervisor( */ override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) } + /** + * The resource counters associated with the hypervisor. + */ + override val counters: SimHypervisorCounters + get() = _counters + private val _counters = object : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() + override val cpuIdleTime: Long + get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + override val cpuStealTime: Long + get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() + override val cpuLostTime: Long = 0L + } + + /** + * The CPU capacity of the hypervisor in MHz. + */ + override val cpuCapacity: Double + get() = cpus.sumOf(FlowConsumer::capacity) + + /** + * The CPU demand of the hypervisor in MHz. + */ + override val cpuDemand: Double + get() = cpus.sumOf(FlowConsumer::demand) + + /** + * The CPU usage of the hypervisor in MHz. + */ + override val cpuUsage: Double + get() = cpus.sumOf(FlowConsumer::rate) + override fun close() { super.close() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index b0515c6e..36f76650 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel @@ -38,64 +37,20 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer * concurrently using weighted fair sharing. * * @param engine The [FlowEngine] to manage the machine's resources. - * @param parent The parent simulation system. + * @param listener The listener for the convergence of the system. * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. * @param interferenceDomain The resource interference domain to which the hypervisor belongs. - * @param listener The hypervisor listener to use. */ public class SimFairShareHypervisor( engine: FlowEngine, - private val parent: FlowConvergenceListener? = null, - scalingGovernor: ScalingGovernor? = null, - interferenceDomain: VmInterferenceDomain? = null, - private val listener: SimHypervisor.Listener? = null -) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) { - - override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean = true - - override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer { - return SwitchSystem(ctx).switch - } - - private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowConvergenceListener { - val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain) - - private var lastCpuUsage = 0.0 - private var lastCpuDemand = 0.0 - private var lastDemand = 0.0 - private var lastActual = 0.0 - private var lastOvercommit = 0.0 - private var lastInterference = 0.0 - private var lastReport = Long.MIN_VALUE - - override fun onConverge(now: Long, delta: Long) { - val listener = listener ?: return - val counters = switch.counters - - if (now > lastReport) { - listener.onSliceFinish( - this@SimFairShareHypervisor, - counters.demand - lastDemand, - counters.actual - lastActual, - counters.overcommit - lastOvercommit, - counters.interference - lastInterference, - lastCpuUsage, - lastCpuDemand - ) - } - lastReport = now - - lastCpuDemand = switch.outputs.sumOf { it.demand } - lastCpuUsage = switch.outputs.sumOf { it.rate } - lastDemand = counters.demand - lastActual = counters.actual - lastOvercommit = counters.overcommit - lastInterference = counters.interference - - val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency } - triggerGovernors(load) - - parent?.onConverge(now, delta) - } - } + listener: FlowConvergenceListener?, + scalingGovernor: ScalingGovernor?, + interferenceDomain: VmInterferenceDomain?, +) : SimAbstractHypervisor(engine, listener, scalingGovernor, interferenceDomain) { + /** + * The multiplexer that distributes the computing capacity. + */ + override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this, interferenceDomain) + + override fun canFit(model: MachineModel): Boolean = true } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index e0a70926..3136f4c8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -35,15 +35,8 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, - parent: FlowConvergenceListener?, + listener: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, - listener: SimHypervisor.Listener? - ): SimHypervisor = SimFairShareHypervisor( - engine, - parent, - scalingGovernor = scalingGovernor, - interferenceDomain = interferenceDomain, - listener = listener - ) + ): SimHypervisor = SimFairShareHypervisor(engine, listener, scalingGovernor, interferenceDomain) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 1b11ca6b..57d4cf20 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowCounters /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload @@ -40,7 +39,22 @@ public interface SimHypervisor : SimWorkload { /** * The resource counters associated with the hypervisor. */ - public val counters: FlowCounters + public val counters: SimHypervisorCounters + + /** + * The CPU usage of the hypervisor in MHz. + */ + public val cpuUsage: Double + + /** + * The CPU usage of the hypervisor in MHz. + */ + public val cpuDemand: Double + + /** + * The CPU capacity of the hypervisor in MHz. + */ + public val cpuCapacity: Double /** * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. @@ -53,23 +67,5 @@ public interface SimHypervisor : SimWorkload { * @param model The machine to create. * @param interferenceId An identifier for the interference model. */ - public fun createMachine(model: MachineModel, interferenceId: String? = null): SimMachine - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - hypervisor: SimHypervisor, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double - ) - } + public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt new file mode 100644 index 00000000..030d9c5f --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel + +/** + * Performance counters of a [SimHypervisor]. + */ +public interface SimHypervisorCounters { + /** + * The amount of time (in milliseconds) the CPUs of the hypervisor were actively running. + */ + public val cpuActiveTime: Long + + /** + * The amount of time (in milliseconds) the CPUs of the hypervisor were idle. + */ + public val cpuIdleTime: Long + + /** + * The amount of CPU time (in milliseconds) that virtual machines were ready to run, but were not able to. + */ + public val cpuStealTime: Long + + /** + * The amount of CPU time (in milliseconds) that was lost due to interference between virtual machines. + */ + public val cpuLostTime: Long +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index dad2cc3b..483217af 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -40,13 +40,12 @@ public interface SimHypervisorProvider { public val id: String /** - * Create a [SimHypervisor] instance with the specified [listener]. + * Create a new [SimHypervisor] instance. */ public fun create( engine: FlowEngine, - parent: FlowConvergenceListener? = null, + listener: FlowConvergenceListener? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, - listener: SimHypervisor.Listener? = null ): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index 883e0d82..82f8df38 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -22,8 +22,9 @@ package org.opendc.simulator.compute.kernel -import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.flow.mux.FlowMultiplexer import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer @@ -31,12 +32,14 @@ import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor(engine: FlowEngine) : SimAbstractHypervisor(engine) { - override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean { - return switch.outputs.size - switch.inputs.size >= model.cpus.size - } +public class SimSpaceSharedHypervisor( + engine: FlowEngine, + listener: FlowConvergenceListener?, + scalingGovernor: ScalingGovernor?, +) : SimAbstractHypervisor(engine, listener, scalingGovernor) { + override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine) - override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer { - return ForwardingFlowMultiplexer(engine) + override fun canFit(model: MachineModel): Boolean { + return mux.outputs.size - mux.inputs.size >= model.cpus.size } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index 93921eb9..dd6fb0b1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -35,9 +35,8 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, - parent: FlowConvergenceListener?, + listener: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, - listener: SimHypervisor.Listener? - ): SimHypervisor = SimSpaceSharedHypervisor(engine) + ): SimHypervisor = SimSpaceSharedHypervisor(engine, listener, scalingGovernor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt new file mode 100644 index 00000000..36219ef2 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel + +import org.opendc.simulator.compute.SimMachine + +/** + * A virtual [SimMachine] running on top of another [SimMachine]. + */ +public interface SimVirtualMachine : SimMachine { + /** + * The resource counters associated with the virtual machine. + */ + public val counters: SimHypervisorCounters + + /** + * The CPU usage of the VM in MHz. + */ + public val cpuUsage: Double + + /** + * The CPU usage of the VM in MHz. + */ + public val cpuDemand: Double + + /** + * The CPU capacity of the VM in MHz. + */ + public val cpuCapacity: Double +} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 058d5d28..9db2e6ec 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -46,7 +46,7 @@ import org.opendc.simulator.flow.FlowEngine * Test suite for the [SimHypervisor] class. */ @OptIn(ExperimentalCoroutinesApi::class) -internal class SimHypervisorTest { +internal class SimFairShareHypervisorTest { private lateinit var model: MachineModel @BeforeEach @@ -63,26 +63,6 @@ internal class SimHypervisorTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val listener = object : SimHypervisor.Listener { - var totalRequestedWork = 0.0 - var totalGrantedWork = 0.0 - var totalOvercommittedWork = 0.0 - - override fun onSliceFinish( - hypervisor: SimHypervisor, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += totalWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - val duration = 5 * 60L val workloadA = SimTraceWorkload( @@ -96,7 +76,7 @@ internal class SimHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener) + val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null) launch { machine.run(hypervisor) @@ -111,9 +91,9 @@ internal class SimHypervisorTest { machine.close() assertAll( - { assertEquals(1113300.0, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1023300.0, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(90000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") }, + { assertEquals(880219, hypervisor.counters.cpuIdleTime, "Idle time does not match") }, + { assertEquals(28125, hypervisor.counters.cpuStealTime, "Steal time does not match") }, { assertEquals(1200000, clock.millis()) { "Current time is correct" } } ) } @@ -123,26 +103,6 @@ internal class SimHypervisorTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val listener = object : SimHypervisor.Listener { - var totalRequestedWork = 0.0 - var totalGrantedWork = 0.0 - var totalOvercommittedWork = 0.0 - - override fun onSliceFinish( - hypervisor: SimHypervisor, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += totalWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } - val duration = 5 * 60L val workloadA = SimTraceWorkload( @@ -167,7 +127,7 @@ internal class SimHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, listener = listener) + val hypervisor = SimFairShareHypervisor(platform, null, null, null) launch { machine.run(hypervisor) @@ -189,9 +149,9 @@ internal class SimHypervisorTest { yield() assertAll( - { assertEquals(2073600.0, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1053600.0, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(1020000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(329250, hypervisor.counters.cpuActiveTime, "Active time does not match") }, + { assertEquals(870750, hypervisor.counters.cpuIdleTime, "Idle time does not match") }, + { assertEquals(318750, hypervisor.counters.cpuStealTime, "Steal time does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -205,10 +165,8 @@ internal class SimHypervisorTest { ) val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimFairShareHypervisor(platform) + val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimFairShareHypervisor(platform, null, null, null) assertDoesNotThrow { launch { @@ -238,7 +196,7 @@ internal class SimHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, interferenceDomain = interferenceModel.newDomain()) + val hypervisor = SimFairShareHypervisor(platform, null, null, interferenceModel.newDomain()) val duration = 5 * 60L val workloadA = diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 95fb6679..b05ffd22 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -75,10 +75,8 @@ internal class SimSpaceSharedHypervisorTest { ) val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(engine) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimSpaceSharedHypervisor(engine, null, null) launch { machine.run(hypervisor) } val vm = hypervisor.createMachine(machineModel) @@ -99,10 +97,8 @@ internal class SimSpaceSharedHypervisorTest { val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(engine) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimSpaceSharedHypervisor(engine, null, null) launch { machine.run(hypervisor) } yield() @@ -125,7 +121,7 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine) + val hypervisor = SimSpaceSharedHypervisor(engine, null, null) launch { machine.run(hypervisor) } yield() @@ -146,7 +142,7 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine) + val hypervisor = SimSpaceSharedHypervisor(engine, null, null) launch { machine.run(hypervisor) } yield() @@ -172,7 +168,7 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine) + val hypervisor = SimSpaceSharedHypervisor(engine, null, null) launch { machine.run(hypervisor) } yield() @@ -196,7 +192,7 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null) launch { machine.run(hypervisor) } yield() diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index 4834f10f..e927f81d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -83,8 +83,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -96,8 +96,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(3) { launch { @@ -113,8 +113,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -126,8 +126,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(2) { launch { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index c8092082..b02426e3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * The previous demand for the consumer. */ - private var previousDemand = 0.0 + private var _previousDemand = 0.0 + private var _previousCapacity = 0.0 /** * Update the counters of the flow consumer. */ protected fun updateCounters(ctx: FlowConnection, delta: Long) { - val demand = previousDemand - previousDemand = ctx.demand + val demand = _previousDemand + val capacity = _previousCapacity + + _previousDemand = ctx.demand + _previousCapacity = ctx.capacity if (delta <= 0) { return @@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi val counters = _counters val deltaS = delta / 1000.0 - val work = demand * deltaS + val total = demand * deltaS + val work = capacity * deltaS val actualWork = ctx.rate * deltaS - val remainingWork = work - actualWork counters.demand += work counters.actual += actualWork - counters.overcommit += remainingWork + counters.remaining += (total - actualWork) } /** * Update the counters of the flow consumer. */ - protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters.demand += demand counters.actual += actual - counters.overcommit += overcommit + counters.remaining += remaining } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index e15d7643..a717ae6e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -37,9 +37,9 @@ public interface FlowCounters { public val actual: Double /** - * The accumulated flow that could not be transferred over the connection. + * The amount of capacity that was not utilized. */ - public val overcommit: Double + public val remaining: Double /** * The accumulated flow lost due to interference between sources. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 17de601a..7eaaf6c2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val counters = _counters val deltaS = delta / 1000.0 + val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS counters.demand += work counters.actual += actualWork - counters.overcommit += (work - actualWork) + counters.remaining += (total - actualWork) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt index 141d335d..d2fa5228 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters internal class FlowCountersImpl : FlowCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 - override var overcommit: Double = 0.0 + override var remaining: Double = 0.0 override var interference: Double = 0.0 override fun reset() { demand = 0.0 actual = 0.0 - overcommit = 0.0 + remaining = 0.0 interference = 0.0 } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 17b82391..04ba7f21 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -39,7 +39,22 @@ public interface FlowMultiplexer { /** * The outputs of the multiplexer over which the flows will be distributed. */ - public val outputs: Set<FlowConsumer> + public val outputs: Set<FlowSource> + + /** + * The actual processing rate of the multiplexer. + */ + public val rate: Double + + /** + * The demanded processing rate of the input. + */ + public val demand: Double + + /** + * The capacity of the outputs. + */ + public val capacity: Double /** * The flow counters to track the flow metrics of all multiplexer inputs. @@ -59,12 +74,27 @@ public interface FlowMultiplexer { public fun removeInput(input: FlowConsumer) /** - * Add the specified [output] to the multiplexer. + * Create a new output on this multiplexer. */ - public fun addOutput(output: FlowConsumer) + public fun newOutput(): FlowSource /** - * Clear all inputs and outputs from the switch. + * Remove [output] from this multiplexer. + */ + public fun removeOutput(output: FlowSource) + + /** + * Clear all inputs and outputs from the multiplexer. */ public fun clear() + + /** + * Clear the inputs of the multiplexer. + */ + public fun clearInputs() + + /** + * Clear the outputs of the multiplexer. + */ + public fun clearOutputs() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 6dd9dcfb..125d10fe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul get() = _inputs private val _inputs = mutableSetOf<Input>() - override val outputs: Set<FlowConsumer> + override val outputs: Set<FlowSource> get() = _outputs - private val _outputs = mutableSetOf<FlowConsumer>() - private val _availableOutputs = ArrayDeque<FlowForwarder>() + private val _outputs = mutableSetOf<Output>() + private val _availableOutputs = ArrayDeque<Output>() override val counters: FlowCounters = object : FlowCounters { override val demand: Double - get() = _outputs.sumOf { it.counters.demand } + get() = _outputs.sumOf { it.forwarder.counters.demand } override val actual: Double - get() = _outputs.sumOf { it.counters.actual } - override val overcommit: Double - get() = _outputs.sumOf { it.counters.overcommit } + get() = _outputs.sumOf { it.forwarder.counters.actual } + override val remaining: Double + get() = _outputs.sumOf { it.forwarder.counters.remaining } override val interference: Double - get() = _outputs.sumOf { it.counters.interference } + get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { - for (input in _outputs) { - input.counters.reset() + for (output in _outputs) { + output.forwarder.counters.reset() } } - override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } + override val rate: Double + get() = _outputs.sumOf { it.forwarder.rate } + + override val demand: Double + get() = _outputs.sumOf { it.forwarder.demand } + + override val capacity: Double + get() = _outputs.sumOf { it.forwarder.capacity } + override fun newInput(key: InterferenceKey?): FlowConsumer { - val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } - val output = Input(forwarder) - _inputs += output - return output + val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val input = Input(output) + _inputs += input + return input } override fun removeInput(input: FlowConsumer) { @@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return } - (input as Input).close() + val output = (input as Input).output + output.forwarder.cancel() + _availableOutputs += output } - override fun addOutput(output: FlowConsumer) { - if (output in outputs) { - return - } - + override fun newOutput(): FlowSource { val forwarder = FlowForwarder(engine) + val output = Output(forwarder) _outputs += output - _availableOutputs += forwarder + return output + } - output.startConsumer(object : FlowSource by forwarder { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _outputs -= output + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } - forwarder.onStop(conn, now, delta) - } - }) + val forwarder = (output as Output).forwarder + forwarder.close() } - override fun clear() { - for (input in _outputs) { - input.cancel() + override fun clearInputs() { + for (input in _inputs) { + val output = input.output + output.forwarder.cancel() + _availableOutputs += output } - _outputs.clear() - // Inputs are implicitly cancelled by the output forwarders _inputs.clear() } + override fun clearOutputs() { + for (output in _outputs) { + output.forwarder.cancel() + } + _outputs.clear() + _availableOutputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() + } + /** * An input on the multiplexer. */ - private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { - /** - * Close the input. - */ - fun close() { - // We explicitly do not close the forwarder here in order to re-use it across input resources. - _inputs -= this - _availableOutputs += forwarder + private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder { + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } + + /** + * An output on the multiplexer. + */ + private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder { + override fun onStart(conn: FlowConnection, now: Long) { + _availableOutputs += this + forwarder.onStart(conn, now) } - override fun toString(): String = "ForwardingFlowMultiplexer.Input" + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + forwarder.cancel() + forwarder.onStop(conn, now, delta) + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Output" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7232df35..5ff0fb8d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer( /** * The outputs of the multiplexer. */ - override val outputs: Set<FlowConsumer> + override val outputs: Set<FlowSource> get() = _outputs - private val _outputs = mutableSetOf<FlowConsumer>() + private val _outputs = mutableSetOf<Output>() private val _activeOutputs = mutableListOf<Output>() /** @@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer( /** * The actual processing rate of the multiplexer. */ + public override val rate: Double + get() = _rate private var _rate = 0.0 /** * The demanded processing rate of the input. */ + public override val demand: Double + get() = _demand private var _demand = 0.0 /** * The capacity of the outputs. */ + public override val capacity: Double + get() = _capacity private var _capacity = 0.0 /** * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null override fun newInput(key: InterferenceKey?): FlowConsumer { val provider = Input(_capacity, key) @@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer( return provider } - override fun addOutput(output: FlowConsumer) { - val consumer = Output(output) - if (_outputs.add(output)) { - _activeOutputs.add(consumer) - output.startConsumer(consumer) - } - } - override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return @@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer( (input as Input).close() } - override fun clear() { - for (input in _activeOutputs) { + override fun newOutput(): FlowSource { + val output = Output() + _outputs.add(output) + return output + } + + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } + + // This cast should always succeed since only `Output` instances should be added to `_outputs` + (output as Output).cancel() + } + + override fun clearInputs() { + for (input in _inputs) { input.cancel() } - _activeOutputs.clear() + _inputs.clear() + } - for (output in _activeInputs) { + override fun clearOutputs() { + for (output in _outputs) { output.cancel() } - _activeInputs.clear() + _outputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() } /** @@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer( if (_schedulerActive) { return } - + val lastSchedulerCycle = _lastSchedulerCycle + val delta = max(0, now - lastSchedulerCycle) _schedulerActive = true + _lastSchedulerCycle = now + try { - doSchedule(now) + doSchedule(now, delta) } finally { _schedulerActive = false } @@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer( /** * Schedule the inputs over the outputs. */ - private fun doSchedule(now: Long) { + private fun doSchedule(now: Long, delta: Long) { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + // Update the counters of the scheduler + updateCounters(delta) + // If there is no work yet, mark the inputs as idle. if (activeInputs.isEmpty()) { + _demand = 0.0 + _rate = 0.0 return } @@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer( // Remove outputs that have finished if (!input.isActive) { + input.actualRate = 0.0 inputIterator.remove() } } @@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer( // Divide the available output capacity fairly over the inputs using max-min fair sharing var remaining = activeInputs.size - for (input in activeInputs) { + for (i in activeInputs.indices) { + val input = activeInputs[i] val availableShare = availableCapacity / remaining-- val grantedRate = min(input.allowedRate, availableShare) @@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer( activeOutputs.sort() // Divide the requests over the available capacity of the input resources fairly - for (output in activeOutputs) { + for (i in activeOutputs.indices) { + val output = activeOutputs[i] val inputCapacity = output.capacity val fraction = inputCapacity / capacity val grantedSpeed = rate * fraction @@ -220,6 +258,29 @@ public class MaxMinFlowMultiplexer( } /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 + + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = _capacity + + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 + + _counters.demand += _demand * deltaS + _counters.actual += _rate * deltaS + _counters.remaining += (previousCapacity - _rate) * deltaS + } + + /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ private inner class Input(capacity: Double, val key: InterferenceKey?) : @@ -253,6 +314,11 @@ public class MaxMinFlowMultiplexer( private var _lastPull: Long = Long.MIN_VALUE /** + * The interference domain this input belongs to. + */ + private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + + /** * Close the input. * * This method is invoked when the user removes an input from the switch. @@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this - if (parent != null) { ctx.shouldConsumerConverge = true } @@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer( doUpdateCounters(delta) actualRate = 0.0 - this.limit = rate + limit = rate _lastPull = now runScheduler(now) } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + val lastConverge = _lastConverge + val parent = parent + + if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { + _lastConverge = now + _lastConvergeInput = this + + parent.onConverge(now, max(0, now - lastConverge)) + } } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 _lastPull = now + + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == this) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + runScheduler(now) } /* Comparable */ @@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / capacity + val load = _rate / _capacity interferenceDomain.apply(key, load) } else { 1.0 } val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualRate * deltaS - val remainingWork = work - actualWork + val demand = limit * deltaS + val actual = actualRate * deltaS + val remaining = (capacity - actualRate) * deltaS - updateCounters(work, actualWork, remainingWork) + updateCounters(demand, actual, remaining) - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) + _counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> { + private inner class Output : FlowSource, Comparable<Output> { /** * The active [FlowConnection] of this source. */ - private var _ctx: FlowConnection? = null + private var _conn: FlowConnection? = null /** * The capacity of this output. @@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer( * Push the specified rate to the consumer. */ fun push(rate: Double) { - _ctx?.push(rate) + _conn?.push(rate) } /** * Cancel this output. */ fun cancel() { - provider.cancel() + _conn?.close() } override fun onStart(conn: FlowConnection, now: Long) { - assert(_ctx == null) { "Source running concurrently" } - _ctx = conn + assert(_conn == null) { "Source running concurrently" } + _conn = conn capacity = conn.capacity + _activeOutputs.add(this) + updateCapacity() } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _ctx = null + _conn = null capacity = 0.0 + _activeOutputs.remove(this) + updateCapacity() + + runScheduler(now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer( updateCapacity() } + // Re-run scheduler to distribute new load runScheduler(now) return Long.MAX_VALUE } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index d548451f..12e72b8f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -217,7 +217,7 @@ internal class FlowForwarderTest { assertEquals(2.0, source.counters.actual) assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } assertEquals(2000, clock.millis()) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt index 3475f027..187dacd9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource /** * Test suite for the [ForwardingFlowMultiplexer] class. */ -internal class ExclusiveFlowMultiplexerTest { +internal class ForwardingFlowMultiplexerTest { /** * Test a trace workload. */ @@ -63,7 +63,7 @@ internal class ExclusiveFlowMultiplexerTest { val forwarder = FlowForwarder(engine) val adapter = FlowSourceRateAdapter(forwarder, speed::add) source.startConsumer(adapter) - switch.addOutput(forwarder) + forwarder.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -88,7 +88,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -127,7 +127,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -146,7 +146,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) switch.newInput() assertThrows<IllegalStateException> { switch.newInput() } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt index 9f6b8a2c..6e2cdb98 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt @@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest { val switch = MaxMinFlowMultiplexer(scheduler) val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { switch.addOutput(it) } + sources.forEach { it.startConsumer(switch.newOutput()) } val provider = switch.newInput() val consumer = FixedFlowSource(2000.0, 1.0) @@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val provider = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) provider.consume(workload) yield() } finally { @@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val providerA = switch.newInput() val providerB = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) coroutineScope { launch { providerA.consume(workloadA) } @@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt index 2b7c1ad7..6667c80c 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt @@ -63,11 +63,9 @@ public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetwor get() = _provider private val _provider = mux.newInput() - override fun createConsumer(): FlowSource { - val forwarder = FlowForwarder(engine, isCoupled = true) - mux.addOutput(forwarder) - return forwarder - } + private val _source = mux.newOutput() + + override fun createConsumer(): FlowSource = _source override fun close() { isClosed = true diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index d536f22d..9f88fecc 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -46,18 +46,14 @@ public class SimPdu( /** * The [FlowForwarder] that represents the input of the PDU. */ - private val forwarder = FlowForwarder(engine) + private val output = mux.newOutput() /** * Create a new PDU outlet. */ public fun newOutlet(): Outlet = Outlet(mux, mux.newInput()) - init { - mux.addOutput(forwarder) - } - - override fun createSource(): FlowSource = FlowMapper(forwarder) { _, rate -> + override fun createSource(): FlowSource = FlowMapper(output) { _, rate -> val loss = computePowerLoss(rate) rate + loss } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt index 312f1d0f..46d659f8 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt @@ -42,19 +42,19 @@ public class SimUps( /** * The resource aggregator used to combine the input sources. */ - private val switch = MaxMinFlowMultiplexer(engine) + private val mux = MaxMinFlowMultiplexer(engine) /** * The [FlowConsumer] that represents the output of the UPS. */ - private val provider = switch.newInput() + private val provider = mux.newInput() /** * Create a new UPS outlet. */ public fun newInlet(): SimPowerInlet { val forward = FlowForwarder(engine, isCoupled = true) - switch.addOutput(forward) + forward.startConsumer(mux.newOutput()) return Inlet(forward) } |
