summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-01 22:04:35 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:41 +0200
commit081221684fb826ab5a00c1d8cc5a9886b9e2203c (patch)
tree7f2202429256b4cb96812f96b682a021f8236180
parent6e424e9b44687d01e618e7bc38afc427610cd845 (diff)
feat(simulator): Expose CPU time counters directly on hypervisor
This change adds a new interface to the SimHypervisor interface that exposes the CPU time counters directly. These are derived from the flow counters and will be used by SimHost to expose them via telemetry.
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt196
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt51
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt20
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt27
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt27
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt144
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt67
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt11
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt48
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt17
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt50
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt (renamed from opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt)66
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt20
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt20
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt6
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt114
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt164
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt)10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt8
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt8
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt6
31 files changed, 704 insertions, 505 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index f499927d..4b96872b 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -50,7 +50,6 @@ import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -63,7 +62,7 @@ public class SimHost(
context: CoroutineContext,
engine: FlowEngine,
meterProvider: MeterProvider,
- hypervisor: SimHypervisorProvider,
+ hypervisorProvider: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
@@ -103,29 +102,8 @@ public class SimHost(
/**
* The hypervisor to run multiple workloads.
*/
- private val hypervisor: SimHypervisor = hypervisor.create(
- engine,
- scalingGovernor = scalingGovernor,
- interferenceDomain = interferenceDomain,
- listener = object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- _cpuDemand = cpuDemand
- _cpuUsage = cpuUsage
-
- collectTime()
- }
- }
- )
- private var _cpuUsage = 0.0
- private var _cpuDemand = 0.0
+ private val hypervisor: SimHypervisor = hypervisorProvider
+ .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain)
/**
* The virtual machines running on the hypervisor.
@@ -157,22 +135,13 @@ public class SimHost(
}
}
+ /**
+ * The [Job] that represents the machine running the hypervisor.
+ */
+ private var _job: Job? = null
+
init {
- // Launch hypervisor onto machine
- scope.launch {
- try {
- _bootTime = clock.millis()
- _state = HostState.UP
- machine.run(this@SimHost.hypervisor, emptyMap())
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- logger.error(cause) { "Host failed" }
- throw cause
- } finally {
- _state = HostState.DOWN
- }
- }
+ launch()
meter.upDownCounterBuilder("system.guests")
.setDescription("Number of guests on this host")
@@ -184,15 +153,15 @@ public class SimHost(
meter.gaugeBuilder("system.cpu.demand")
.setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(_cpuDemand) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) }
meter.gaugeBuilder("system.cpu.usage")
.setDescription("Amount of CPU resources used by the host")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(_cpuUsage) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) }
meter.gaugeBuilder("system.cpu.utilization")
.setDescription("Utilization of the CPU resources of the host")
.setUnit("%")
- .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) }
meter.counterBuilder("system.cpu.time")
.setDescription("Amount of CPU time spent by the host")
.setUnit("s")
@@ -200,16 +169,16 @@ public class SimHost(
meter.gaugeBuilder("system.power.usage")
.setDescription("Power usage of the host ")
.setUnit("W")
- .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ .buildWithCallback { result -> result.observe(machine.powerUsage) }
meter.counterBuilder("system.power.total")
.setDescription("Amount of energy used by the CPU")
.setUnit("J")
.ofDoubles()
- .buildWithCallback(::collectPowerTotal)
+ .buildWithCallback { result -> result.observe(machine.energyUsage) }
meter.counterBuilder("system.time")
.setDescription("The uptime of the host")
.setUnit("s")
- .buildWithCallback(::collectTime)
+ .buildWithCallback(::collectUptime)
meter.gaugeBuilder("system.time.boot")
.setDescription("The boot time of the host")
.setUnit("1")
@@ -290,9 +259,12 @@ public class SimHost(
}
public suspend fun recover() {
- collectTime()
- _state = HostState.UP
- _bootTime = clock.millis()
+ updateUptime()
+
+ launch()
+
+ // Wait for the hypervisor to launch before recovering the guests
+ yield()
for (guest in guests.values) {
guest.recover()
@@ -300,14 +272,42 @@ public class SimHost(
}
/**
+ * Launch the hypervisor.
+ */
+ private fun launch() {
+ check(_job == null) { "Concurrent hypervisor running" }
+
+ // Launch hypervisor onto machine
+ _job = scope.launch {
+ try {
+ _bootTime = clock.millis()
+ _state = HostState.UP
+ machine.run(hypervisor, emptyMap())
+ } catch (_: CancellationException) {
+ // Ignored
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Host failed" }
+ throw cause
+ } finally {
+ _state = HostState.DOWN
+ }
+ }
+ }
+
+ /**
* Reset the machine.
*/
private fun reset() {
- collectTime()
+ updateUptime()
+
+ // Stop the hypervisor
+ val job = _job
+ if (job != null) {
+ job.cancel()
+ _job = null
+ }
_state = HostState.DOWN
- _cpuUsage = 0.0
- _cpuDemand = 0.0
}
/**
@@ -385,85 +385,46 @@ public class SimHost(
}
}
- private var _lastCpuTimeCallback = clock.millis()
-
- /**
- * Helper function to track the CPU time of a machine.
- */
- private fun collectCpuTime(result: ObservableLongMeasurement) {
- val now = clock.millis()
- val duration = now - _lastCpuTimeCallback
-
- try {
- collectCpuTime(duration, result)
- } finally {
- _lastCpuTimeCallback = now
- }
- }
-
private val _activeState = Attributes.of(STATE_KEY, "active")
private val _stealState = Attributes.of(STATE_KEY, "steal")
private val _lostState = Attributes.of(STATE_KEY, "lost")
private val _idleState = Attributes.of(STATE_KEY, "idle")
- private var _totalTime = 0.0
/**
* Helper function to track the CPU time of a machine.
*/
- private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
- val coreCount = this.model.cpuCount
- val d = coreCount / _cpuLimit
-
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
val counters = hypervisor.counters
- val grantedWork = counters.actual
- val overcommittedWork = counters.overcommit
- val interferedWork = counters.interference
- _totalTime += (duration / 1000.0) * coreCount
- val activeTime = (grantedWork * d).roundToLong()
- val idleTime = (_totalTime - grantedWork * d).roundToLong()
- val stealTime = (overcommittedWork * d).roundToLong()
- val lostTime = (interferedWork * d).roundToLong()
-
- result.observe(activeTime, _activeState)
- result.observe(idleTime, _idleState)
- result.observe(stealTime, _stealState)
- result.observe(lostTime, _lostState)
+ result.observe(counters.cpuActiveTime / 1000L, _activeState)
+ result.observe(counters.cpuIdleTime / 1000L, _idleState)
+ result.observe(counters.cpuStealTime / 1000L, _stealState)
+ result.observe(counters.cpuLostTime / 1000L, _lostState)
for (guest in guests.values) {
- guest.collectCpuTime(duration, result)
+ guest.collectCpuTime(result)
}
}
- private var _lastPowerCallback = clock.millis()
- private var _totalPower = 0.0
-
- /**
- * Helper function to collect the total power usage of the machine.
- */
- private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
- val now = clock.millis()
- val duration = now - _lastPowerCallback
-
- _totalPower += duration / 1000.0 * machine.powerDraw
- result.observe(_totalPower)
-
- _lastPowerCallback = now
- }
-
private var _lastReport = clock.millis()
/**
* Helper function to track the uptime of a machine.
*/
- private fun collectTime(result: ObservableLongMeasurement? = null) {
+ private fun updateUptime() {
val now = clock.millis()
val duration = now - _lastReport
+ _lastReport = now
- try {
- collectTime(duration, result)
- } finally {
- _lastReport = now
+ if (_state == HostState.UP) {
+ _uptime += duration
+ } else if (_state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
+ }
+
+ for (guest in guests.values) {
+ guest.updateUptime(duration)
}
}
@@ -475,19 +436,14 @@ public class SimHost(
/**
* Helper function to track the uptime of a machine.
*/
- private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
- if (state == HostState.UP) {
- _uptime += duration
- } else if (state == HostState.DOWN && scope.isActive) {
- // Only increment downtime if the machine is in a failure state
- _downtime += duration
- }
+ private fun collectUptime(result: ObservableLongMeasurement) {
+ updateUptime()
- result?.observe(_uptime, _upState)
- result?.observe(_downtime, _downState)
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
for (guest in guests.values) {
- guest.collectUptime(duration, result)
+ guest.collectUptime(result)
}
}
@@ -496,9 +452,9 @@ public class SimHost(
/**
* Helper function to track the boot time of a machine.
*/
- private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ private fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result?.observe(_bootTime)
+ result.observe(_bootTime)
}
for (guest in guests.values) {
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index eda76ba0..3ac165c8 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -33,12 +33,10 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimWorkloadMapper
-import org.opendc.simulator.compute.SimAbstractMachine
-import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.kernel.SimVirtualMachine
import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Clock
import kotlin.coroutines.CoroutineContext
-import kotlin.math.roundToLong
/**
* A virtual machine instance that is managed by a [SimHost].
@@ -50,7 +48,7 @@ internal class Guest(
private val mapper: SimWorkloadMapper,
private val listener: GuestListener,
val server: Server,
- val machine: SimMachine
+ val machine: SimVirtualMachine
) {
/**
* The [CoroutineScope] of the guest.
@@ -236,17 +234,22 @@ internal class Guest(
.build()
/**
- * Helper function to track the uptime of the guest.
+ * Helper function to track the uptime and downtime of the guest.
*/
- fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ fun updateUptime(duration: Long) {
if (state == ServerState.RUNNING) {
_uptime += duration
} else if (state == ServerState.ERROR) {
_downtime += duration
}
+ }
- result?.observe(_uptime, _upState)
- result?.observe(_downtime, _downState)
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(result: ObservableLongMeasurement) {
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
}
private var _bootTime = Long.MIN_VALUE
@@ -254,9 +257,9 @@ internal class Guest(
/**
* Helper function to track the boot time of the guest.
*/
- fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result?.observe(_bootTime)
+ result.observe(_bootTime)
}
}
@@ -276,33 +279,17 @@ internal class Guest(
.putAll(attributes)
.put(STATE_KEY, "idle")
.build()
- private var _totalTime = 0.0
/**
* Helper function to track the CPU time of a machine.
*/
- fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
- val coreCount = server.flavor.cpuCount
- val d = coreCount / _cpuLimit
-
- var grantedWork = 0.0
- var overcommittedWork = 0.0
-
- for (cpu in (machine as SimAbstractMachine).cpus) {
- val counters = cpu.counters
- grantedWork += counters.actual
- overcommittedWork += counters.overcommit
- }
-
- _totalTime += (duration / 1000.0) * coreCount
- val activeTime = (grantedWork * d).roundToLong()
- val idleTime = (_totalTime - grantedWork * d).roundToLong()
- val stealTime = (overcommittedWork * d).roundToLong()
+ fun collectCpuTime(result: ObservableLongMeasurement) {
+ val counters = machine.counters
- result.observe(activeTime, _activeState)
- result.observe(idleTime, _idleState)
- result.observe(stealTime, _stealState)
- result.observe(0, _lostState)
+ result.observe(counters.cpuActiveTime / 1000, _activeState)
+ result.observe(counters.cpuIdleTime / 1000, _idleState)
+ result.observe(counters.cpuStealTime / 1000, _stealState)
+ result.observe(counters.cpuLostTime / 1000, _lostState)
}
private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index d2293be7..26089b6d 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -170,9 +170,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(659, activeTime, "Active time does not match") },
- { assertEquals(2342, idleTime, "Idle time does not match") },
- { assertEquals(638, stealTime, "Steal time does not match") },
+ { assertEquals(658, activeTime, "Active time does not match") },
+ { assertEquals(1741, idleTime, "Idle time does not match") },
+ { assertEquals(637, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -253,7 +253,7 @@ internal class SimHostTest {
host.spawn(server)
delay(5000L)
host.fail()
- delay(5000L)
+ delay(duration * 1000)
host.recover()
suspendCancellableCoroutine<Unit> { cont ->
@@ -274,12 +274,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2661, idleTime, "Idle time does not match") },
- { assertEquals(339, activeTime, "Active time does not match") },
- { assertEquals(1195001, uptime, "Uptime does not match") },
- { assertEquals(5000, downtime, "Downtime does not match") },
- { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
- { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
+ { assertEquals(1175, idleTime, "Idle time does not match") },
+ { assertEquals(624, activeTime, "Active time does not match") },
+ { assertEquals(900001, uptime, "Uptime does not match") },
+ { assertEquals(300000, downtime, "Downtime does not match") },
+ { assertEquals(900000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(300000, guestDowntime, "Guest downtime does not match") },
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 67d39ffa..9d540118 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -116,11 +116,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223327751, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(67009849, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3155964, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.840207707767459E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -160,10 +160,11 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10998184, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9740216, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.009945802750012E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -209,9 +210,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6009751, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14728649, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12526520, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(480866, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -252,9 +253,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(11133606, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9604794, exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(1311, exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
{ assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index c57919c1..d654d58a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -92,7 +92,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
@@ -113,7 +113,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(engine)
+ val hypervisor = SimFairShareHypervisor(engine, null, null, null)
launch { machine.run(hypervisor) }
@@ -134,7 +134,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(engine)
+ val hypervisor = SimFairShareHypervisor(engine, null, null, null)
launch { machine.run(hypervisor) }
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 0bcf5957..9140d31b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -28,6 +28,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.FlowEngine
+import kotlin.math.max
/**
* A simulated bare-metal machine that is able to run a single workload.
@@ -49,10 +50,18 @@ public class SimBareMetalMachine(
parent: FlowConvergenceListener? = null,
) : SimAbstractMachine(engine, parent, model) {
/**
- * The power draw of the machine onto the PSU.
+ * The current power usage of the machine (without PSU loss) in W.
*/
- public val powerDraw: Double
- get() = powerDriverLogic.computePower()
+ public val powerUsage: Double
+ get() = _powerUsage
+ private var _powerUsage = 0.0
+
+ /**
+ * The total energy usage of the machine (without PSU loss) in Joules.
+ */
+ public val energyUsage: Double
+ get() = _energyUsage
+ private var _energyUsage = 0.0
/**
* The processing units of the machine.
@@ -66,8 +75,20 @@ public class SimBareMetalMachine(
*/
private val powerDriverLogic = powerDriver.createLogic(this, cpus)
+ private var _lastConverge = Long.MAX_VALUE
+
override fun onConverge(now: Long, delta: Long) {
+ // Update the PSU stage
psu.update()
+
+ val lastConverge = _lastConverge
+ _lastConverge = now
+ val duration = max(0, now - lastConverge)
+ if (duration > 0) {
+ // Compute the power and energy usage of the machine
+ _energyUsage += _powerUsage * (duration / 1000.0)
+ _powerUsage = powerDriverLogic.computePower()
+ }
}
init {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index bcba8e8e..aac8b959 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -30,6 +30,7 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.mux.FlowMultiplexer
+import kotlin.math.roundToLong
/**
* Abstract implementation of the [SimHypervisor] interface.
@@ -39,18 +40,19 @@ import org.opendc.simulator.flow.mux.FlowMultiplexer
*/
public abstract class SimAbstractHypervisor(
protected val engine: FlowEngine,
- private val scalingGovernor: ScalingGovernor? = null,
+ private val listener: FlowConvergenceListener?,
+ private val scalingGovernor: ScalingGovernor?,
protected val interferenceDomain: VmInterferenceDomain? = null
-) : SimHypervisor {
+) : SimHypervisor, FlowConvergenceListener {
/**
* The machine on which the hypervisor runs.
*/
- private lateinit var context: SimMachineContext
+ protected lateinit var context: SimMachineContext
/**
* The resource switch to use.
*/
- private lateinit var mux: FlowMultiplexer
+ protected abstract val mux: FlowMultiplexer
/**
* The virtual machines running on this hypervisor.
@@ -62,39 +64,73 @@ public abstract class SimAbstractHypervisor(
/**
* The resource counters associated with the hypervisor.
*/
- public override val counters: FlowCounters
- get() = mux.counters
+ public override val counters: SimHypervisorCounters
+ get() = _counters
+ private val _counters = object : SimHypervisorCounters {
+ @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity
+
+ override var cpuActiveTime: Long = 0L
+ override var cpuIdleTime: Long = 0L
+ override var cpuStealTime: Long = 0L
+ override var cpuLostTime: Long = 0L
+
+ private var _previousDemand = 0.0
+ private var _previousActual = 0.0
+ private var _previousRemaining = 0.0
+ private var _previousInterference = 0.0
+
+ /**
+ * Record the CPU time of the hypervisor.
+ */
+ fun record() {
+ val counters = mux.counters
+ val demand = counters.demand
+ val actual = counters.actual
+ val remaining = counters.remaining
+ val interference = counters.interference
+
+ val demandDelta = demand - _previousDemand
+ val actualDelta = actual - _previousActual
+ val remainingDelta = remaining - _previousRemaining
+ val interferenceDelta = interference - _previousInterference
+
+ _previousDemand = demand
+ _previousActual = actual
+ _previousRemaining = remaining
+ _previousInterference = interference
+
+ cpuActiveTime += (actualDelta * d).roundToLong()
+ cpuIdleTime += (remainingDelta * d).roundToLong()
+ cpuStealTime += ((demandDelta - actualDelta) * d).roundToLong()
+ cpuLostTime += (interferenceDelta * d).roundToLong()
+ }
+ }
/**
- * The scaling governors attached to the physical CPUs backing this hypervisor.
+ * The CPU capacity of the hypervisor in MHz.
*/
- private val governors = mutableListOf<ScalingGovernor.Logic>()
+ override val cpuCapacity: Double
+ get() = mux.capacity
/**
- * Construct the [FlowMultiplexer] implementation that performs the actual scheduling of the CPUs.
+ * The CPU demand of the hypervisor in MHz.
*/
- public abstract fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer
+ override val cpuDemand: Double
+ get() = mux.demand
/**
- * Check whether the specified machine model fits on this hypervisor.
+ * The CPU usage of the hypervisor in MHz.
*/
- public abstract fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean
+ override val cpuUsage: Double
+ get() = mux.rate
/**
- * Trigger the governors to recompute the scaling limits.
+ * The scaling governors attached to the physical CPUs backing this hypervisor.
*/
- protected fun triggerGovernors(load: Double) {
- for (governor in governors) {
- governor.onLimit(load)
- }
- }
+ private val governors = mutableListOf<ScalingGovernor.Logic>()
/* SimHypervisor */
- override fun canFit(model: MachineModel): Boolean {
- return canFit(model, mux)
- }
-
- override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine {
+ override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
require(canFit(model)) { "Machine does not fit" }
val vm = VirtualMachine(model, interferenceId)
_vms.add(vm)
@@ -104,7 +140,13 @@ public abstract class SimAbstractHypervisor(
/* SimWorkload */
override fun onStart(ctx: SimMachineContext) {
context = ctx
- mux = createMultiplexer(ctx)
+
+ _cpuCount = ctx.cpus.size
+ _cpuCapacity = ctx.cpus.sumOf { it.model.frequency }
+ _counters.d = _cpuCount / _cpuCapacity * 1000L
+
+ // Clear the existing outputs of the multiplexer
+ mux.clearOutputs()
for (cpu in ctx.cpus) {
val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu))
@@ -113,16 +155,31 @@ public abstract class SimAbstractHypervisor(
governor.onStart()
}
- mux.addOutput(cpu)
+ cpu.startConsumer(mux.newOutput())
}
}
+ private var _cpuCount = 0
+ private var _cpuCapacity = 0.0
+
+ /* FlowConvergenceListener */
+ override fun onConverge(now: Long, delta: Long) {
+ _counters.record()
+
+ val load = cpuDemand / cpuCapacity
+ for (governor in governors) {
+ governor.onLimit(load)
+ }
+
+ listener?.onConverge(now, delta)
+ }
+
/**
* A virtual machine running on the hypervisor.
*
* @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model) {
+ private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine {
/**
* The interference key of this virtual machine.
*/
@@ -133,6 +190,41 @@ public abstract class SimAbstractHypervisor(
*/
override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) }
+ /**
+ * The resource counters associated with the hypervisor.
+ */
+ override val counters: SimHypervisorCounters
+ get() = _counters
+ private val _counters = object : SimHypervisorCounters {
+ private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
+
+ override val cpuActiveTime: Long
+ get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
+ override val cpuIdleTime: Long
+ get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
+ override val cpuStealTime: Long
+ get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
+ override val cpuLostTime: Long = 0L
+ }
+
+ /**
+ * The CPU capacity of the hypervisor in MHz.
+ */
+ override val cpuCapacity: Double
+ get() = cpus.sumOf(FlowConsumer::capacity)
+
+ /**
+ * The CPU demand of the hypervisor in MHz.
+ */
+ override val cpuDemand: Double
+ get() = cpus.sumOf(FlowConsumer::demand)
+
+ /**
+ * The CPU usage of the hypervisor in MHz.
+ */
+ override val cpuUsage: Double
+ get() = cpus.sumOf(FlowConsumer::rate)
+
override fun close() {
super.close()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
index b0515c6e..36f76650 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
-import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
@@ -38,64 +37,20 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
* concurrently using weighted fair sharing.
*
* @param engine The [FlowEngine] to manage the machine's resources.
- * @param parent The parent simulation system.
+ * @param listener The listener for the convergence of the system.
* @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
* @param interferenceDomain The resource interference domain to which the hypervisor belongs.
- * @param listener The hypervisor listener to use.
*/
public class SimFairShareHypervisor(
engine: FlowEngine,
- private val parent: FlowConvergenceListener? = null,
- scalingGovernor: ScalingGovernor? = null,
- interferenceDomain: VmInterferenceDomain? = null,
- private val listener: SimHypervisor.Listener? = null
-) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) {
-
- override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean = true
-
- override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
- return SwitchSystem(ctx).switch
- }
-
- private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowConvergenceListener {
- val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
-
- private var lastCpuUsage = 0.0
- private var lastCpuDemand = 0.0
- private var lastDemand = 0.0
- private var lastActual = 0.0
- private var lastOvercommit = 0.0
- private var lastInterference = 0.0
- private var lastReport = Long.MIN_VALUE
-
- override fun onConverge(now: Long, delta: Long) {
- val listener = listener ?: return
- val counters = switch.counters
-
- if (now > lastReport) {
- listener.onSliceFinish(
- this@SimFairShareHypervisor,
- counters.demand - lastDemand,
- counters.actual - lastActual,
- counters.overcommit - lastOvercommit,
- counters.interference - lastInterference,
- lastCpuUsage,
- lastCpuDemand
- )
- }
- lastReport = now
-
- lastCpuDemand = switch.outputs.sumOf { it.demand }
- lastCpuUsage = switch.outputs.sumOf { it.rate }
- lastDemand = counters.demand
- lastActual = counters.actual
- lastOvercommit = counters.overcommit
- lastInterference = counters.interference
-
- val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency }
- triggerGovernors(load)
-
- parent?.onConverge(now, delta)
- }
- }
+ listener: FlowConvergenceListener?,
+ scalingGovernor: ScalingGovernor?,
+ interferenceDomain: VmInterferenceDomain?,
+) : SimAbstractHypervisor(engine, listener, scalingGovernor, interferenceDomain) {
+ /**
+ * The multiplexer that distributes the computing capacity.
+ */
+ override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
+
+ override fun canFit(model: MachineModel): Boolean = true
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
index e0a70926..3136f4c8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
@@ -35,15 +35,8 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override fun create(
engine: FlowEngine,
- parent: FlowConvergenceListener?,
+ listener: FlowConvergenceListener?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
- listener: SimHypervisor.Listener?
- ): SimHypervisor = SimFairShareHypervisor(
- engine,
- parent,
- scalingGovernor = scalingGovernor,
- interferenceDomain = interferenceDomain,
- listener = listener
- )
+ ): SimHypervisor = SimFairShareHypervisor(engine, listener, scalingGovernor, interferenceDomain)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 1b11ca6b..57d4cf20 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.flow.FlowCounters
/**
* A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
@@ -40,7 +39,22 @@ public interface SimHypervisor : SimWorkload {
/**
* The resource counters associated with the hypervisor.
*/
- public val counters: FlowCounters
+ public val counters: SimHypervisorCounters
+
+ /**
+ * The CPU usage of the hypervisor in MHz.
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The CPU usage of the hypervisor in MHz.
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU capacity of the hypervisor in MHz.
+ */
+ public val cpuCapacity: Double
/**
* Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
@@ -53,23 +67,5 @@ public interface SimHypervisor : SimWorkload {
* @param model The machine to create.
* @param interferenceId An identifier for the interference model.
*/
- public fun createMachine(model: MachineModel, interferenceId: String? = null): SimMachine
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
+ public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt
new file mode 100644
index 00000000..030d9c5f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel
+
+/**
+ * Performance counters of a [SimHypervisor].
+ */
+public interface SimHypervisorCounters {
+ /**
+ * The amount of time (in milliseconds) the CPUs of the hypervisor were actively running.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The amount of time (in milliseconds) the CPUs of the hypervisor were idle.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The amount of CPU time (in milliseconds) that virtual machines were ready to run, but were not able to.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The amount of CPU time (in milliseconds) that was lost due to interference between virtual machines.
+ */
+ public val cpuLostTime: Long
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
index dad2cc3b..483217af 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
@@ -40,13 +40,12 @@ public interface SimHypervisorProvider {
public val id: String
/**
- * Create a [SimHypervisor] instance with the specified [listener].
+ * Create a new [SimHypervisor] instance.
*/
public fun create(
engine: FlowEngine,
- parent: FlowConvergenceListener? = null,
+ listener: FlowConvergenceListener? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
- listener: SimHypervisor.Listener? = null
): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
index 883e0d82..82f8df38 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
@@ -22,8 +22,9 @@
package org.opendc.simulator.compute.kernel
-import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.flow.FlowConvergenceListener
import org.opendc.simulator.flow.FlowEngine
import org.opendc.simulator.flow.mux.FlowMultiplexer
import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
@@ -31,12 +32,14 @@ import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
-public class SimSpaceSharedHypervisor(engine: FlowEngine) : SimAbstractHypervisor(engine) {
- override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean {
- return switch.outputs.size - switch.inputs.size >= model.cpus.size
- }
+public class SimSpaceSharedHypervisor(
+ engine: FlowEngine,
+ listener: FlowConvergenceListener?,
+ scalingGovernor: ScalingGovernor?,
+) : SimAbstractHypervisor(engine, listener, scalingGovernor) {
+ override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine)
- override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
- return ForwardingFlowMultiplexer(engine)
+ override fun canFit(model: MachineModel): Boolean {
+ return mux.outputs.size - mux.inputs.size >= model.cpus.size
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
index 93921eb9..dd6fb0b1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
@@ -35,9 +35,8 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override fun create(
engine: FlowEngine,
- parent: FlowConvergenceListener?,
+ listener: FlowConvergenceListener?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
- listener: SimHypervisor.Listener?
- ): SimHypervisor = SimSpaceSharedHypervisor(engine)
+ ): SimHypervisor = SimSpaceSharedHypervisor(engine, listener, scalingGovernor)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt
new file mode 100644
index 00000000..36219ef2
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel
+
+import org.opendc.simulator.compute.SimMachine
+
+/**
+ * A virtual [SimMachine] running on top of another [SimMachine].
+ */
+public interface SimVirtualMachine : SimMachine {
+ /**
+ * The resource counters associated with the virtual machine.
+ */
+ public val counters: SimHypervisorCounters
+
+ /**
+ * The CPU usage of the VM in MHz.
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The CPU usage of the VM in MHz.
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU capacity of the VM in MHz.
+ */
+ public val cpuCapacity: Double
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 058d5d28..9db2e6ec 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -46,7 +46,7 @@ import org.opendc.simulator.flow.FlowEngine
* Test suite for the [SimHypervisor] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-internal class SimHypervisorTest {
+internal class SimFairShareHypervisorTest {
private lateinit var model: MachineModel
@BeforeEach
@@ -63,26 +63,6 @@ internal class SimHypervisorTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val listener = object : SimHypervisor.Listener {
- var totalRequestedWork = 0.0
- var totalGrantedWork = 0.0
- var totalOvercommittedWork = 0.0
-
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += totalWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
@@ -96,7 +76,7 @@ internal class SimHypervisorTest {
val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null)
launch {
machine.run(hypervisor)
@@ -111,9 +91,9 @@ internal class SimHypervisorTest {
machine.close()
assertAll(
- { assertEquals(1113300.0, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1023300.0, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(90000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") },
+ { assertEquals(880219, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
+ { assertEquals(28125, hypervisor.counters.cpuStealTime, "Steal time does not match") },
{ assertEquals(1200000, clock.millis()) { "Current time is correct" } }
)
}
@@ -123,26 +103,6 @@ internal class SimHypervisorTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val listener = object : SimHypervisor.Listener {
- var totalRequestedWork = 0.0
- var totalGrantedWork = 0.0
- var totalOvercommittedWork = 0.0
-
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += totalWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
@@ -167,7 +127,7 @@ internal class SimHypervisorTest {
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(platform, listener = listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, null, null)
launch {
machine.run(hypervisor)
@@ -189,9 +149,9 @@ internal class SimHypervisorTest {
yield()
assertAll(
- { assertEquals(2073600.0, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1053600.0, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(1020000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(329250, hypervisor.counters.cpuActiveTime, "Active time does not match") },
+ { assertEquals(870750, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
+ { assertEquals(318750, hypervisor.counters.cpuStealTime, "Steal time does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -205,10 +165,8 @@ internal class SimHypervisorTest {
)
val platform = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimFairShareHypervisor(platform)
+ val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimFairShareHypervisor(platform, null, null, null)
assertDoesNotThrow {
launch {
@@ -238,7 +196,7 @@ internal class SimHypervisorTest {
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(platform, interferenceDomain = interferenceModel.newDomain())
+ val hypervisor = SimFairShareHypervisor(platform, null, null, interferenceModel.newDomain())
val duration = 5 * 60L
val workloadA =
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 95fb6679..b05ffd22 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -75,10 +75,8 @@ internal class SimSpaceSharedHypervisorTest {
)
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
val vm = hypervisor.createMachine(machineModel)
@@ -99,10 +97,8 @@ internal class SimSpaceSharedHypervisorTest {
val duration = 5 * 60L * 1000
val workload = SimRuntimeWorkload(duration)
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -125,7 +121,7 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -146,7 +142,7 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -172,7 +168,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testConcurrentWorkloadFails() = runBlockingSimulation {
val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -196,7 +192,7 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(
interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null)
launch { machine.run(hypervisor) }
yield()
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index 4834f10f..e927f81d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -83,8 +83,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -96,8 +96,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(3) {
launch {
@@ -113,8 +113,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -126,8 +126,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(2) {
launch {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
index c8092082..b02426e3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
/**
* The previous demand for the consumer.
*/
- private var previousDemand = 0.0
+ private var _previousDemand = 0.0
+ private var _previousCapacity = 0.0
/**
* Update the counters of the flow consumer.
*/
protected fun updateCounters(ctx: FlowConnection, delta: Long) {
- val demand = previousDemand
- previousDemand = ctx.demand
+ val demand = _previousDemand
+ val capacity = _previousCapacity
+
+ _previousDemand = ctx.demand
+ _previousCapacity = ctx.capacity
if (delta <= 0) {
return
@@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
val counters = _counters
val deltaS = delta / 1000.0
- val work = demand * deltaS
+ val total = demand * deltaS
+ val work = capacity * deltaS
val actualWork = ctx.rate * deltaS
- val remainingWork = work - actualWork
counters.demand += work
counters.actual += actualWork
- counters.overcommit += remainingWork
+ counters.remaining += (total - actualWork)
}
/**
* Update the counters of the flow consumer.
*/
- protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
+ protected fun updateCounters(demand: Double, actual: Double, remaining: Double) {
val counters = _counters
counters.demand += demand
counters.actual += actual
- counters.overcommit += overcommit
+ counters.remaining += remaining
}
final override fun startConsumer(source: FlowSource) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
index e15d7643..a717ae6e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -37,9 +37,9 @@ public interface FlowCounters {
public val actual: Double
/**
- * The accumulated flow that could not be transferred over the connection.
+ * The amount of capacity that was not utilized.
*/
- public val overcommit: Double
+ public val remaining: Double
/**
* The accumulated flow lost due to interference between sources.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 17de601a..7eaaf6c2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
val counters = _counters
val deltaS = delta / 1000.0
+ val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
counters.demand += work
counters.actual += actualWork
- counters.overcommit += (work - actualWork)
+ counters.remaining += (total - actualWork)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
index 141d335d..d2fa5228 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
@@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters
internal class FlowCountersImpl : FlowCounters {
override var demand: Double = 0.0
override var actual: Double = 0.0
- override var overcommit: Double = 0.0
+ override var remaining: Double = 0.0
override var interference: Double = 0.0
override fun reset() {
demand = 0.0
actual = 0.0
- overcommit = 0.0
+ remaining = 0.0
interference = 0.0
}
override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 17b82391..04ba7f21 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -39,7 +39,22 @@ public interface FlowMultiplexer {
/**
* The outputs of the multiplexer over which the flows will be distributed.
*/
- public val outputs: Set<FlowConsumer>
+ public val outputs: Set<FlowSource>
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ public val rate: Double
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ public val demand: Double
+
+ /**
+ * The capacity of the outputs.
+ */
+ public val capacity: Double
/**
* The flow counters to track the flow metrics of all multiplexer inputs.
@@ -59,12 +74,27 @@ public interface FlowMultiplexer {
public fun removeInput(input: FlowConsumer)
/**
- * Add the specified [output] to the multiplexer.
+ * Create a new output on this multiplexer.
*/
- public fun addOutput(output: FlowConsumer)
+ public fun newOutput(): FlowSource
/**
- * Clear all inputs and outputs from the switch.
+ * Remove [output] from this multiplexer.
+ */
+ public fun removeOutput(output: FlowSource)
+
+ /**
+ * Clear all inputs and outputs from the multiplexer.
*/
public fun clear()
+
+ /**
+ * Clear the inputs of the multiplexer.
+ */
+ public fun clearInputs()
+
+ /**
+ * Clear the outputs of the multiplexer.
+ */
+ public fun clearOutputs()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 6dd9dcfb..125d10fe 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
get() = _inputs
private val _inputs = mutableSetOf<Input>()
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
- private val _availableOutputs = ArrayDeque<FlowForwarder>()
+ private val _outputs = mutableSetOf<Output>()
+ private val _availableOutputs = ArrayDeque<Output>()
override val counters: FlowCounters = object : FlowCounters {
override val demand: Double
- get() = _outputs.sumOf { it.counters.demand }
+ get() = _outputs.sumOf { it.forwarder.counters.demand }
override val actual: Double
- get() = _outputs.sumOf { it.counters.actual }
- override val overcommit: Double
- get() = _outputs.sumOf { it.counters.overcommit }
+ get() = _outputs.sumOf { it.forwarder.counters.actual }
+ override val remaining: Double
+ get() = _outputs.sumOf { it.forwarder.counters.remaining }
override val interference: Double
- get() = _outputs.sumOf { it.counters.interference }
+ get() = _outputs.sumOf { it.forwarder.counters.interference }
override fun reset() {
- for (input in _outputs) {
- input.counters.reset()
+ for (output in _outputs) {
+ output.forwarder.counters.reset()
}
}
- override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
}
+ override val rate: Double
+ get() = _outputs.sumOf { it.forwarder.rate }
+
+ override val demand: Double
+ get() = _outputs.sumOf { it.forwarder.demand }
+
+ override val capacity: Double
+ get() = _outputs.sumOf { it.forwarder.capacity }
+
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
- val output = Input(forwarder)
- _inputs += output
- return output
+ val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val input = Input(output)
+ _inputs += input
+ return input
}
override fun removeInput(input: FlowConsumer) {
@@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
return
}
- (input as Input).close()
+ val output = (input as Input).output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- override fun addOutput(output: FlowConsumer) {
- if (output in outputs) {
- return
- }
-
+ override fun newOutput(): FlowSource {
val forwarder = FlowForwarder(engine)
+ val output = Output(forwarder)
_outputs += output
- _availableOutputs += forwarder
+ return output
+ }
- output.startConsumer(object : FlowSource by forwarder {
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _outputs -= output
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
- forwarder.onStop(conn, now, delta)
- }
- })
+ val forwarder = (output as Output).forwarder
+ forwarder.close()
}
- override fun clear() {
- for (input in _outputs) {
- input.cancel()
+ override fun clearInputs() {
+ for (input in _inputs) {
+ val output = input.output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- _outputs.clear()
- // Inputs are implicitly cancelled by the output forwarders
_inputs.clear()
}
+ override fun clearOutputs() {
+ for (output in _outputs) {
+ output.forwarder.cancel()
+ }
+ _outputs.clear()
+ _availableOutputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
+ }
+
/**
* An input on the multiplexer.
*/
- private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder {
- /**
- * Close the input.
- */
- fun close() {
- // We explicitly do not close the forwarder here in order to re-use it across input resources.
- _inputs -= this
- _availableOutputs += forwarder
+ private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+
+ /**
+ * An output on the multiplexer.
+ */
+ private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _availableOutputs += this
+ forwarder.onStart(conn, now)
}
- override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ forwarder.cancel()
+ forwarder.onStop(conn, now, delta)
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Output"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 7232df35..5ff0fb8d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer(
/**
* The outputs of the multiplexer.
*/
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _outputs = mutableSetOf<Output>()
private val _activeOutputs = mutableListOf<Output>()
/**
@@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer(
/**
* The actual processing rate of the multiplexer.
*/
+ public override val rate: Double
+ get() = _rate
private var _rate = 0.0
/**
* The demanded processing rate of the input.
*/
+ public override val demand: Double
+ get() = _demand
private var _demand = 0.0
/**
* The capacity of the outputs.
*/
+ public override val capacity: Double
+ get() = _capacity
private var _capacity = 0.0
/**
* Flag to indicate that the scheduler is active.
*/
private var _schedulerActive = false
+ private var _lastSchedulerCycle = Long.MAX_VALUE
+
+ /**
+ * The last convergence timestamp and the input.
+ */
+ private var _lastConverge: Long = Long.MIN_VALUE
+ private var _lastConvergeInput: Input? = null
override fun newInput(key: InterferenceKey?): FlowConsumer {
val provider = Input(_capacity, key)
@@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer(
return provider
}
- override fun addOutput(output: FlowConsumer) {
- val consumer = Output(output)
- if (_outputs.add(output)) {
- _activeOutputs.add(consumer)
- output.startConsumer(consumer)
- }
- }
-
override fun removeInput(input: FlowConsumer) {
if (!_inputs.remove(input)) {
return
@@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer(
(input as Input).close()
}
- override fun clear() {
- for (input in _activeOutputs) {
+ override fun newOutput(): FlowSource {
+ val output = Output()
+ _outputs.add(output)
+ return output
+ }
+
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+
+ // This cast should always succeed since only `Output` instances should be added to `_outputs`
+ (output as Output).cancel()
+ }
+
+ override fun clearInputs() {
+ for (input in _inputs) {
input.cancel()
}
- _activeOutputs.clear()
+ _inputs.clear()
+ }
- for (output in _activeInputs) {
+ override fun clearOutputs() {
+ for (output in _outputs) {
output.cancel()
}
- _activeInputs.clear()
+ _outputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
}
/**
@@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer(
if (_schedulerActive) {
return
}
-
+ val lastSchedulerCycle = _lastSchedulerCycle
+ val delta = max(0, now - lastSchedulerCycle)
_schedulerActive = true
+ _lastSchedulerCycle = now
+
try {
- doSchedule(now)
+ doSchedule(now, delta)
} finally {
_schedulerActive = false
}
@@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer(
/**
* Schedule the inputs over the outputs.
*/
- private fun doSchedule(now: Long) {
+ private fun doSchedule(now: Long, delta: Long) {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
+ // Update the counters of the scheduler
+ updateCounters(delta)
+
// If there is no work yet, mark the inputs as idle.
if (activeInputs.isEmpty()) {
+ _demand = 0.0
+ _rate = 0.0
return
}
@@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer(
// Remove outputs that have finished
if (!input.isActive) {
+ input.actualRate = 0.0
inputIterator.remove()
}
}
@@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer(
// Divide the available output capacity fairly over the inputs using max-min fair sharing
var remaining = activeInputs.size
- for (input in activeInputs) {
+ for (i in activeInputs.indices) {
+ val input = activeInputs[i]
val availableShare = availableCapacity / remaining--
val grantedRate = min(input.allowedRate, availableShare)
@@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer(
activeOutputs.sort()
// Divide the requests over the available capacity of the input resources fairly
- for (output in activeOutputs) {
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
val inputCapacity = output.capacity
val fraction = inputCapacity / capacity
val grantedSpeed = rate * fraction
@@ -220,6 +258,29 @@ public class MaxMinFlowMultiplexer(
}
/**
+ * The previous capacity of the multiplexer.
+ */
+ private var _previousCapacity = 0.0
+
+ /**
+ * Update the counters of the scheduler.
+ */
+ private fun updateCounters(delta: Long) {
+ val previousCapacity = _previousCapacity
+ _previousCapacity = _capacity
+
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta / 1000.0
+
+ _counters.demand += _demand * deltaS
+ _counters.actual += _rate * deltaS
+ _counters.remaining += (previousCapacity - _rate) * deltaS
+ }
+
+ /**
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private inner class Input(capacity: Double, val key: InterferenceKey?) :
@@ -253,6 +314,11 @@ public class MaxMinFlowMultiplexer(
private var _lastPull: Long = Long.MIN_VALUE
/**
+ * The interference domain this input belongs to.
+ */
+ private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain
+
+ /**
* Close the input.
*
* This method is invoked when the user removes an input from the switch.
@@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer(
check(!_isClosed) { "Cannot re-use closed input" }
_activeInputs += this
-
if (parent != null) {
ctx.shouldConsumerConverge = true
}
@@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer(
doUpdateCounters(delta)
actualRate = 0.0
- this.limit = rate
+ limit = rate
_lastPull = now
runScheduler(now)
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ val lastConverge = _lastConverge
+ val parent = parent
+
+ if (parent != null && (lastConverge < now || _lastConvergeInput == null)) {
+ _lastConverge = now
+ _lastConvergeInput = this
+
+ parent.onConverge(now, max(0, now - lastConverge))
+ }
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
@@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
_lastPull = now
+
+ // Assign a new input responsible for handling the convergence events
+ if (_lastConvergeInput == this) {
+ _lastConvergeInput = null
+ }
+
+ // Re-run scheduler to distribute new load
+ runScheduler(now)
}
/* Comparable */
@@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer(
// Compute the performance penalty due to flow interference
val perfScore = if (interferenceDomain != null) {
- val load = _rate / capacity
+ val load = _rate / _capacity
interferenceDomain.apply(key, load)
} else {
1.0
}
val deltaS = delta / 1000.0
- val work = limit * deltaS
- val actualWork = actualRate * deltaS
- val remainingWork = work - actualWork
+ val demand = limit * deltaS
+ val actual = actualRate * deltaS
+ val remaining = (capacity - actualRate) * deltaS
- updateCounters(work, actualWork, remainingWork)
+ updateCounters(demand, actual, remaining)
- val distCounters = _counters
- distCounters.demand += work
- distCounters.actual += actualWork
- distCounters.overcommit += remainingWork
- distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ _counters.interference += actual * max(0.0, 1 - perfScore)
}
}
/**
* An internal [FlowSource] implementation for multiplexer outputs.
*/
- private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> {
+ private inner class Output : FlowSource, Comparable<Output> {
/**
* The active [FlowConnection] of this source.
*/
- private var _ctx: FlowConnection? = null
+ private var _conn: FlowConnection? = null
/**
* The capacity of this output.
@@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer(
* Push the specified rate to the consumer.
*/
fun push(rate: Double) {
- _ctx?.push(rate)
+ _conn?.push(rate)
}
/**
* Cancel this output.
*/
fun cancel() {
- provider.cancel()
+ _conn?.close()
}
override fun onStart(conn: FlowConnection, now: Long) {
- assert(_ctx == null) { "Source running concurrently" }
- _ctx = conn
+ assert(_conn == null) { "Source running concurrently" }
+ _conn = conn
capacity = conn.capacity
+ _activeOutputs.add(this)
+
updateCapacity()
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _ctx = null
+ _conn = null
capacity = 0.0
+ _activeOutputs.remove(this)
+
updateCapacity()
+
+ runScheduler(now)
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
@@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer(
updateCapacity()
}
+ // Re-run scheduler to distribute new load
runScheduler(now)
return Long.MAX_VALUE
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index d548451f..12e72b8f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -217,7 +217,7 @@ internal class FlowForwarderTest {
assertEquals(2.0, source.counters.actual)
assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
- assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
assertEquals(2000, clock.millis())
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
index 3475f027..187dacd9 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource
/**
* Test suite for the [ForwardingFlowMultiplexer] class.
*/
-internal class ExclusiveFlowMultiplexerTest {
+internal class ForwardingFlowMultiplexerTest {
/**
* Test a trace workload.
*/
@@ -63,7 +63,7 @@ internal class ExclusiveFlowMultiplexerTest {
val forwarder = FlowForwarder(engine)
val adapter = FlowSourceRateAdapter(forwarder, speed::add)
source.startConsumer(adapter)
- switch.addOutput(forwarder)
+ forwarder.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -88,7 +88,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -127,7 +127,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -146,7 +146,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
switch.newInput()
assertThrows<IllegalStateException> { switch.newInput() }
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
index 9f6b8a2c..6e2cdb98 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
@@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest {
val switch = MaxMinFlowMultiplexer(scheduler)
val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { switch.addOutput(it) }
+ sources.forEach { it.startConsumer(switch.newOutput()) }
val provider = switch.newInput()
val consumer = FixedFlowSource(2000.0, 1.0)
@@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val provider = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
provider.consume(workload)
yield()
} finally {
@@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val providerA = switch.newInput()
val providerB = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
coroutineScope {
launch { providerA.consume(workloadA) }
@@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
index 2b7c1ad7..6667c80c 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
@@ -63,11 +63,9 @@ public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetwor
get() = _provider
private val _provider = mux.newInput()
- override fun createConsumer(): FlowSource {
- val forwarder = FlowForwarder(engine, isCoupled = true)
- mux.addOutput(forwarder)
- return forwarder
- }
+ private val _source = mux.newOutput()
+
+ override fun createConsumer(): FlowSource = _source
override fun close() {
isClosed = true
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index d536f22d..9f88fecc 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -46,18 +46,14 @@ public class SimPdu(
/**
* The [FlowForwarder] that represents the input of the PDU.
*/
- private val forwarder = FlowForwarder(engine)
+ private val output = mux.newOutput()
/**
* Create a new PDU outlet.
*/
public fun newOutlet(): Outlet = Outlet(mux, mux.newInput())
- init {
- mux.addOutput(forwarder)
- }
-
- override fun createSource(): FlowSource = FlowMapper(forwarder) { _, rate ->
+ override fun createSource(): FlowSource = FlowMapper(output) { _, rate ->
val loss = computePowerLoss(rate)
rate + loss
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
index 312f1d0f..46d659f8 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
@@ -42,19 +42,19 @@ public class SimUps(
/**
* The resource aggregator used to combine the input sources.
*/
- private val switch = MaxMinFlowMultiplexer(engine)
+ private val mux = MaxMinFlowMultiplexer(engine)
/**
* The [FlowConsumer] that represents the output of the UPS.
*/
- private val provider = switch.newInput()
+ private val provider = mux.newInput()
/**
* Create a new UPS outlet.
*/
public fun newInlet(): SimPowerInlet {
val forward = FlowForwarder(engine, isCoupled = true)
- switch.addOutput(forward)
+ forward.startConsumer(mux.newOutput())
return Inlet(forward)
}