summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt196
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt51
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt20
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt27
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt27
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt144
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt67
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt11
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt48
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt17
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt50
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt (renamed from opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt)66
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt20
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt20
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt6
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt114
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt164
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt)10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt8
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt8
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt6
31 files changed, 704 insertions, 505 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index f499927d..4b96872b 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -50,7 +50,6 @@ import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
-import kotlin.math.roundToLong
/**
* A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor].
@@ -63,7 +62,7 @@ public class SimHost(
context: CoroutineContext,
engine: FlowEngine,
meterProvider: MeterProvider,
- hypervisor: SimHypervisorProvider,
+ hypervisorProvider: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
@@ -103,29 +102,8 @@ public class SimHost(
/**
* The hypervisor to run multiple workloads.
*/
- private val hypervisor: SimHypervisor = hypervisor.create(
- engine,
- scalingGovernor = scalingGovernor,
- interferenceDomain = interferenceDomain,
- listener = object : SimHypervisor.Listener {
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- _cpuDemand = cpuDemand
- _cpuUsage = cpuUsage
-
- collectTime()
- }
- }
- )
- private var _cpuUsage = 0.0
- private var _cpuDemand = 0.0
+ private val hypervisor: SimHypervisor = hypervisorProvider
+ .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain)
/**
* The virtual machines running on the hypervisor.
@@ -157,22 +135,13 @@ public class SimHost(
}
}
+ /**
+ * The [Job] that represents the machine running the hypervisor.
+ */
+ private var _job: Job? = null
+
init {
- // Launch hypervisor onto machine
- scope.launch {
- try {
- _bootTime = clock.millis()
- _state = HostState.UP
- machine.run(this@SimHost.hypervisor, emptyMap())
- } catch (_: CancellationException) {
- // Ignored
- } catch (cause: Throwable) {
- logger.error(cause) { "Host failed" }
- throw cause
- } finally {
- _state = HostState.DOWN
- }
- }
+ launch()
meter.upDownCounterBuilder("system.guests")
.setDescription("Number of guests on this host")
@@ -184,15 +153,15 @@ public class SimHost(
meter.gaugeBuilder("system.cpu.demand")
.setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(_cpuDemand) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) }
meter.gaugeBuilder("system.cpu.usage")
.setDescription("Amount of CPU resources used by the host")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(_cpuUsage) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) }
meter.gaugeBuilder("system.cpu.utilization")
.setDescription("Utilization of the CPU resources of the host")
.setUnit("%")
- .buildWithCallback { result -> result.observe(_cpuUsage / _cpuLimit) }
+ .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) }
meter.counterBuilder("system.cpu.time")
.setDescription("Amount of CPU time spent by the host")
.setUnit("s")
@@ -200,16 +169,16 @@ public class SimHost(
meter.gaugeBuilder("system.power.usage")
.setDescription("Power usage of the host ")
.setUnit("W")
- .buildWithCallback { result -> result.observe(machine.powerDraw) }
+ .buildWithCallback { result -> result.observe(machine.powerUsage) }
meter.counterBuilder("system.power.total")
.setDescription("Amount of energy used by the CPU")
.setUnit("J")
.ofDoubles()
- .buildWithCallback(::collectPowerTotal)
+ .buildWithCallback { result -> result.observe(machine.energyUsage) }
meter.counterBuilder("system.time")
.setDescription("The uptime of the host")
.setUnit("s")
- .buildWithCallback(::collectTime)
+ .buildWithCallback(::collectUptime)
meter.gaugeBuilder("system.time.boot")
.setDescription("The boot time of the host")
.setUnit("1")
@@ -290,9 +259,12 @@ public class SimHost(
}
public suspend fun recover() {
- collectTime()
- _state = HostState.UP
- _bootTime = clock.millis()
+ updateUptime()
+
+ launch()
+
+ // Wait for the hypervisor to launch before recovering the guests
+ yield()
for (guest in guests.values) {
guest.recover()
@@ -300,14 +272,42 @@ public class SimHost(
}
/**
+ * Launch the hypervisor.
+ */
+ private fun launch() {
+ check(_job == null) { "Concurrent hypervisor running" }
+
+ // Launch hypervisor onto machine
+ _job = scope.launch {
+ try {
+ _bootTime = clock.millis()
+ _state = HostState.UP
+ machine.run(hypervisor, emptyMap())
+ } catch (_: CancellationException) {
+ // Ignored
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Host failed" }
+ throw cause
+ } finally {
+ _state = HostState.DOWN
+ }
+ }
+ }
+
+ /**
* Reset the machine.
*/
private fun reset() {
- collectTime()
+ updateUptime()
+
+ // Stop the hypervisor
+ val job = _job
+ if (job != null) {
+ job.cancel()
+ _job = null
+ }
_state = HostState.DOWN
- _cpuUsage = 0.0
- _cpuDemand = 0.0
}
/**
@@ -385,85 +385,46 @@ public class SimHost(
}
}
- private var _lastCpuTimeCallback = clock.millis()
-
- /**
- * Helper function to track the CPU time of a machine.
- */
- private fun collectCpuTime(result: ObservableLongMeasurement) {
- val now = clock.millis()
- val duration = now - _lastCpuTimeCallback
-
- try {
- collectCpuTime(duration, result)
- } finally {
- _lastCpuTimeCallback = now
- }
- }
-
private val _activeState = Attributes.of(STATE_KEY, "active")
private val _stealState = Attributes.of(STATE_KEY, "steal")
private val _lostState = Attributes.of(STATE_KEY, "lost")
private val _idleState = Attributes.of(STATE_KEY, "idle")
- private var _totalTime = 0.0
/**
* Helper function to track the CPU time of a machine.
*/
- private fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
- val coreCount = this.model.cpuCount
- val d = coreCount / _cpuLimit
-
+ private fun collectCpuTime(result: ObservableLongMeasurement) {
val counters = hypervisor.counters
- val grantedWork = counters.actual
- val overcommittedWork = counters.overcommit
- val interferedWork = counters.interference
- _totalTime += (duration / 1000.0) * coreCount
- val activeTime = (grantedWork * d).roundToLong()
- val idleTime = (_totalTime - grantedWork * d).roundToLong()
- val stealTime = (overcommittedWork * d).roundToLong()
- val lostTime = (interferedWork * d).roundToLong()
-
- result.observe(activeTime, _activeState)
- result.observe(idleTime, _idleState)
- result.observe(stealTime, _stealState)
- result.observe(lostTime, _lostState)
+ result.observe(counters.cpuActiveTime / 1000L, _activeState)
+ result.observe(counters.cpuIdleTime / 1000L, _idleState)
+ result.observe(counters.cpuStealTime / 1000L, _stealState)
+ result.observe(counters.cpuLostTime / 1000L, _lostState)
for (guest in guests.values) {
- guest.collectCpuTime(duration, result)
+ guest.collectCpuTime(result)
}
}
- private var _lastPowerCallback = clock.millis()
- private var _totalPower = 0.0
-
- /**
- * Helper function to collect the total power usage of the machine.
- */
- private fun collectPowerTotal(result: ObservableDoubleMeasurement) {
- val now = clock.millis()
- val duration = now - _lastPowerCallback
-
- _totalPower += duration / 1000.0 * machine.powerDraw
- result.observe(_totalPower)
-
- _lastPowerCallback = now
- }
-
private var _lastReport = clock.millis()
/**
* Helper function to track the uptime of a machine.
*/
- private fun collectTime(result: ObservableLongMeasurement? = null) {
+ private fun updateUptime() {
val now = clock.millis()
val duration = now - _lastReport
+ _lastReport = now
- try {
- collectTime(duration, result)
- } finally {
- _lastReport = now
+ if (_state == HostState.UP) {
+ _uptime += duration
+ } else if (_state == HostState.DOWN && scope.isActive) {
+ // Only increment downtime if the machine is in a failure state
+ _downtime += duration
+ }
+
+ for (guest in guests.values) {
+ guest.updateUptime(duration)
}
}
@@ -475,19 +436,14 @@ public class SimHost(
/**
* Helper function to track the uptime of a machine.
*/
- private fun collectTime(duration: Long, result: ObservableLongMeasurement? = null) {
- if (state == HostState.UP) {
- _uptime += duration
- } else if (state == HostState.DOWN && scope.isActive) {
- // Only increment downtime if the machine is in a failure state
- _downtime += duration
- }
+ private fun collectUptime(result: ObservableLongMeasurement) {
+ updateUptime()
- result?.observe(_uptime, _upState)
- result?.observe(_downtime, _downState)
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
for (guest in guests.values) {
- guest.collectUptime(duration, result)
+ guest.collectUptime(result)
}
}
@@ -496,9 +452,9 @@ public class SimHost(
/**
* Helper function to track the boot time of a machine.
*/
- private fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ private fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result?.observe(_bootTime)
+ result.observe(_bootTime)
}
for (guest in guests.values) {
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index eda76ba0..3ac165c8 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -33,12 +33,10 @@ import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.simulator.SimHost
import org.opendc.compute.simulator.SimWorkloadMapper
-import org.opendc.simulator.compute.SimAbstractMachine
-import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.kernel.SimVirtualMachine
import org.opendc.simulator.compute.workload.SimWorkload
import java.time.Clock
import kotlin.coroutines.CoroutineContext
-import kotlin.math.roundToLong
/**
* A virtual machine instance that is managed by a [SimHost].
@@ -50,7 +48,7 @@ internal class Guest(
private val mapper: SimWorkloadMapper,
private val listener: GuestListener,
val server: Server,
- val machine: SimMachine
+ val machine: SimVirtualMachine
) {
/**
* The [CoroutineScope] of the guest.
@@ -236,17 +234,22 @@ internal class Guest(
.build()
/**
- * Helper function to track the uptime of the guest.
+ * Helper function to track the uptime and downtime of the guest.
*/
- fun collectUptime(duration: Long, result: ObservableLongMeasurement? = null) {
+ fun updateUptime(duration: Long) {
if (state == ServerState.RUNNING) {
_uptime += duration
} else if (state == ServerState.ERROR) {
_downtime += duration
}
+ }
- result?.observe(_uptime, _upState)
- result?.observe(_downtime, _downState)
+ /**
+ * Helper function to track the uptime of the guest.
+ */
+ fun collectUptime(result: ObservableLongMeasurement) {
+ result.observe(_uptime, _upState)
+ result.observe(_downtime, _downState)
}
private var _bootTime = Long.MIN_VALUE
@@ -254,9 +257,9 @@ internal class Guest(
/**
* Helper function to track the boot time of the guest.
*/
- fun collectBootTime(result: ObservableLongMeasurement? = null) {
+ fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result?.observe(_bootTime)
+ result.observe(_bootTime)
}
}
@@ -276,33 +279,17 @@ internal class Guest(
.putAll(attributes)
.put(STATE_KEY, "idle")
.build()
- private var _totalTime = 0.0
/**
* Helper function to track the CPU time of a machine.
*/
- fun collectCpuTime(duration: Long, result: ObservableLongMeasurement) {
- val coreCount = server.flavor.cpuCount
- val d = coreCount / _cpuLimit
-
- var grantedWork = 0.0
- var overcommittedWork = 0.0
-
- for (cpu in (machine as SimAbstractMachine).cpus) {
- val counters = cpu.counters
- grantedWork += counters.actual
- overcommittedWork += counters.overcommit
- }
-
- _totalTime += (duration / 1000.0) * coreCount
- val activeTime = (grantedWork * d).roundToLong()
- val idleTime = (_totalTime - grantedWork * d).roundToLong()
- val stealTime = (overcommittedWork * d).roundToLong()
+ fun collectCpuTime(result: ObservableLongMeasurement) {
+ val counters = machine.counters
- result.observe(activeTime, _activeState)
- result.observe(idleTime, _idleState)
- result.observe(stealTime, _stealState)
- result.observe(0, _lostState)
+ result.observe(counters.cpuActiveTime / 1000, _activeState)
+ result.observe(counters.cpuIdleTime / 1000, _idleState)
+ result.observe(counters.cpuStealTime / 1000, _stealState)
+ result.observe(counters.cpuLostTime / 1000, _lostState)
}
private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index d2293be7..26089b6d 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -170,9 +170,9 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(659, activeTime, "Active time does not match") },
- { assertEquals(2342, idleTime, "Idle time does not match") },
- { assertEquals(638, stealTime, "Steal time does not match") },
+ { assertEquals(658, activeTime, "Active time does not match") },
+ { assertEquals(1741, idleTime, "Idle time does not match") },
+ { assertEquals(637, stealTime, "Steal time does not match") },
{ assertEquals(1500001, clock.millis()) }
)
}
@@ -253,7 +253,7 @@ internal class SimHostTest {
host.spawn(server)
delay(5000L)
host.fail()
- delay(5000L)
+ delay(duration * 1000)
host.recover()
suspendCancellableCoroutine<Unit> { cont ->
@@ -274,12 +274,12 @@ internal class SimHostTest {
reader.close()
assertAll(
- { assertEquals(2661, idleTime, "Idle time does not match") },
- { assertEquals(339, activeTime, "Active time does not match") },
- { assertEquals(1195001, uptime, "Uptime does not match") },
- { assertEquals(5000, downtime, "Downtime does not match") },
- { assertEquals(1195000, guestUptime, "Guest uptime does not match") },
- { assertEquals(5000, guestDowntime, "Guest downtime does not match") },
+ { assertEquals(1175, idleTime, "Idle time does not match") },
+ { assertEquals(624, activeTime, "Active time does not match") },
+ { assertEquals(900001, uptime, "Uptime does not match") },
+ { assertEquals(300000, downtime, "Downtime does not match") },
+ { assertEquals(900000, guestUptime, "Guest uptime does not match") },
+ { assertEquals(300000, guestDowntime, "Guest downtime does not match") },
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 67d39ffa..9d540118 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -116,11 +116,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223327751, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(67009849, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3155964, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.840207707767459E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -160,10 +160,11 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10998184, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9740216, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.009945802750012E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -209,9 +210,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6009751, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14728649, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12526520, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(480866, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -252,9 +253,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(11133606, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9604794, exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(1311, exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
{ assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index c57919c1..d654d58a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -92,7 +92,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
@@ -113,7 +113,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(engine)
+ val hypervisor = SimFairShareHypervisor(engine, null, null, null)
launch { machine.run(hypervisor) }
@@ -134,7 +134,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(engine)
+ val hypervisor = SimFairShareHypervisor(engine, null, null, null)
launch { machine.run(hypervisor) }
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 0bcf5957..9140d31b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -28,6 +28,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.FlowEngine
+import kotlin.math.max
/**
* A simulated bare-metal machine that is able to run a single workload.
@@ -49,10 +50,18 @@ public class SimBareMetalMachine(
parent: FlowConvergenceListener? = null,
) : SimAbstractMachine(engine, parent, model) {
/**
- * The power draw of the machine onto the PSU.
+ * The current power usage of the machine (without PSU loss) in W.
*/
- public val powerDraw: Double
- get() = powerDriverLogic.computePower()
+ public val powerUsage: Double
+ get() = _powerUsage
+ private var _powerUsage = 0.0
+
+ /**
+ * The total energy usage of the machine (without PSU loss) in Joules.
+ */
+ public val energyUsage: Double
+ get() = _energyUsage
+ private var _energyUsage = 0.0
/**
* The processing units of the machine.
@@ -66,8 +75,20 @@ public class SimBareMetalMachine(
*/
private val powerDriverLogic = powerDriver.createLogic(this, cpus)
+ private var _lastConverge = Long.MAX_VALUE
+
override fun onConverge(now: Long, delta: Long) {
+ // Update the PSU stage
psu.update()
+
+ val lastConverge = _lastConverge
+ _lastConverge = now
+ val duration = max(0, now - lastConverge)
+ if (duration > 0) {
+ // Compute the power and energy usage of the machine
+ _energyUsage += _powerUsage * (duration / 1000.0)
+ _powerUsage = powerDriverLogic.computePower()
+ }
}
init {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index bcba8e8e..aac8b959 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -30,6 +30,7 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.mux.FlowMultiplexer
+import kotlin.math.roundToLong
/**
* Abstract implementation of the [SimHypervisor] interface.
@@ -39,18 +40,19 @@ import org.opendc.simulator.flow.mux.FlowMultiplexer
*/
public abstract class SimAbstractHypervisor(
protected val engine: FlowEngine,
- private val scalingGovernor: ScalingGovernor? = null,
+ private val listener: FlowConvergenceListener?,
+ private val scalingGovernor: ScalingGovernor?,
protected val interferenceDomain: VmInterferenceDomain? = null
-) : SimHypervisor {
+) : SimHypervisor, FlowConvergenceListener {
/**
* The machine on which the hypervisor runs.
*/
- private lateinit var context: SimMachineContext
+ protected lateinit var context: SimMachineContext
/**
* The resource switch to use.
*/
- private lateinit var mux: FlowMultiplexer
+ protected abstract val mux: FlowMultiplexer
/**
* The virtual machines running on this hypervisor.
@@ -62,39 +64,73 @@ public abstract class SimAbstractHypervisor(
/**
* The resource counters associated with the hypervisor.
*/
- public override val counters: FlowCounters
- get() = mux.counters
+ public override val counters: SimHypervisorCounters
+ get() = _counters
+ private val _counters = object : SimHypervisorCounters {
+ @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity
+
+ override var cpuActiveTime: Long = 0L
+ override var cpuIdleTime: Long = 0L
+ override var cpuStealTime: Long = 0L
+ override var cpuLostTime: Long = 0L
+
+ private var _previousDemand = 0.0
+ private var _previousActual = 0.0
+ private var _previousRemaining = 0.0
+ private var _previousInterference = 0.0
+
+ /**
+ * Record the CPU time of the hypervisor.
+ */
+ fun record() {
+ val counters = mux.counters
+ val demand = counters.demand
+ val actual = counters.actual
+ val remaining = counters.remaining
+ val interference = counters.interference
+
+ val demandDelta = demand - _previousDemand
+ val actualDelta = actual - _previousActual
+ val remainingDelta = remaining - _previousRemaining
+ val interferenceDelta = interference - _previousInterference
+
+ _previousDemand = demand
+ _previousActual = actual
+ _previousRemaining = remaining
+ _previousInterference = interference
+
+ cpuActiveTime += (actualDelta * d).roundToLong()
+ cpuIdleTime += (remainingDelta * d).roundToLong()
+ cpuStealTime += ((demandDelta - actualDelta) * d).roundToLong()
+ cpuLostTime += (interferenceDelta * d).roundToLong()
+ }
+ }
/**
- * The scaling governors attached to the physical CPUs backing this hypervisor.
+ * The CPU capacity of the hypervisor in MHz.
*/
- private val governors = mutableListOf<ScalingGovernor.Logic>()
+ override val cpuCapacity: Double
+ get() = mux.capacity
/**
- * Construct the [FlowMultiplexer] implementation that performs the actual scheduling of the CPUs.
+ * The CPU demand of the hypervisor in MHz.
*/
- public abstract fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer
+ override val cpuDemand: Double
+ get() = mux.demand
/**
- * Check whether the specified machine model fits on this hypervisor.
+ * The CPU usage of the hypervisor in MHz.
*/
- public abstract fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean
+ override val cpuUsage: Double
+ get() = mux.rate
/**
- * Trigger the governors to recompute the scaling limits.
+ * The scaling governors attached to the physical CPUs backing this hypervisor.
*/
- protected fun triggerGovernors(load: Double) {
- for (governor in governors) {
- governor.onLimit(load)
- }
- }
+ private val governors = mutableListOf<ScalingGovernor.Logic>()
/* SimHypervisor */
- override fun canFit(model: MachineModel): Boolean {
- return canFit(model, mux)
- }
-
- override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine {
+ override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
require(canFit(model)) { "Machine does not fit" }
val vm = VirtualMachine(model, interferenceId)
_vms.add(vm)
@@ -104,7 +140,13 @@ public abstract class SimAbstractHypervisor(
/* SimWorkload */
override fun onStart(ctx: SimMachineContext) {
context = ctx
- mux = createMultiplexer(ctx)
+
+ _cpuCount = ctx.cpus.size
+ _cpuCapacity = ctx.cpus.sumOf { it.model.frequency }
+ _counters.d = _cpuCount / _cpuCapacity * 1000L
+
+ // Clear the existing outputs of the multiplexer
+ mux.clearOutputs()
for (cpu in ctx.cpus) {
val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu))
@@ -113,16 +155,31 @@ public abstract class SimAbstractHypervisor(
governor.onStart()
}
- mux.addOutput(cpu)
+ cpu.startConsumer(mux.newOutput())
}
}
+ private var _cpuCount = 0
+ private var _cpuCapacity = 0.0
+
+ /* FlowConvergenceListener */
+ override fun onConverge(now: Long, delta: Long) {
+ _counters.record()
+
+ val load = cpuDemand / cpuCapacity
+ for (governor in governors) {
+ governor.onLimit(load)
+ }
+
+ listener?.onConverge(now, delta)
+ }
+
/**
* A virtual machine running on the hypervisor.
*
* @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model) {
+ private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine {
/**
* The interference key of this virtual machine.
*/
@@ -133,6 +190,41 @@ public abstract class SimAbstractHypervisor(
*/
override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) }
+ /**
+ * The resource counters associated with the hypervisor.
+ */
+ override val counters: SimHypervisorCounters
+ get() = _counters
+ private val _counters = object : SimHypervisorCounters {
+ private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
+
+ override val cpuActiveTime: Long
+ get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
+ override val cpuIdleTime: Long
+ get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
+ override val cpuStealTime: Long
+ get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
+ override val cpuLostTime: Long = 0L
+ }
+
+ /**
+ * The CPU capacity of the hypervisor in MHz.
+ */
+ override val cpuCapacity: Double
+ get() = cpus.sumOf(FlowConsumer::capacity)
+
+ /**
+ * The CPU demand of the hypervisor in MHz.
+ */
+ override val cpuDemand: Double
+ get() = cpus.sumOf(FlowConsumer::demand)
+
+ /**
+ * The CPU usage of the hypervisor in MHz.
+ */
+ override val cpuUsage: Double
+ get() = cpus.sumOf(FlowConsumer::rate)
+
override fun close() {
super.close()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
index b0515c6e..36f76650 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
-import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
@@ -38,64 +37,20 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
* concurrently using weighted fair sharing.
*
* @param engine The [FlowEngine] to manage the machine's resources.
- * @param parent The parent simulation system.
+ * @param listener The listener for the convergence of the system.
* @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
* @param interferenceDomain The resource interference domain to which the hypervisor belongs.
- * @param listener The hypervisor listener to use.
*/
public class SimFairShareHypervisor(
engine: FlowEngine,
- private val parent: FlowConvergenceListener? = null,
- scalingGovernor: ScalingGovernor? = null,
- interferenceDomain: VmInterferenceDomain? = null,
- private val listener: SimHypervisor.Listener? = null
-) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) {
-
- override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean = true
-
- override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
- return SwitchSystem(ctx).switch
- }
-
- private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowConvergenceListener {
- val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
-
- private var lastCpuUsage = 0.0
- private var lastCpuDemand = 0.0
- private var lastDemand = 0.0
- private var lastActual = 0.0
- private var lastOvercommit = 0.0
- private var lastInterference = 0.0
- private var lastReport = Long.MIN_VALUE
-
- override fun onConverge(now: Long, delta: Long) {
- val listener = listener ?: return
- val counters = switch.counters
-
- if (now > lastReport) {
- listener.onSliceFinish(
- this@SimFairShareHypervisor,
- counters.demand - lastDemand,
- counters.actual - lastActual,
- counters.overcommit - lastOvercommit,
- counters.interference - lastInterference,
- lastCpuUsage,
- lastCpuDemand
- )
- }
- lastReport = now
-
- lastCpuDemand = switch.outputs.sumOf { it.demand }
- lastCpuUsage = switch.outputs.sumOf { it.rate }
- lastDemand = counters.demand
- lastActual = counters.actual
- lastOvercommit = counters.overcommit
- lastInterference = counters.interference
-
- val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency }
- triggerGovernors(load)
-
- parent?.onConverge(now, delta)
- }
- }
+ listener: FlowConvergenceListener?,
+ scalingGovernor: ScalingGovernor?,
+ interferenceDomain: VmInterferenceDomain?,
+) : SimAbstractHypervisor(engine, listener, scalingGovernor, interferenceDomain) {
+ /**
+ * The multiplexer that distributes the computing capacity.
+ */
+ override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
+
+ override fun canFit(model: MachineModel): Boolean = true
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
index e0a70926..3136f4c8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
@@ -35,15 +35,8 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override fun create(
engine: FlowEngine,
- parent: FlowConvergenceListener?,
+ listener: FlowConvergenceListener?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
- listener: SimHypervisor.Listener?
- ): SimHypervisor = SimFairShareHypervisor(
- engine,
- parent,
- scalingGovernor = scalingGovernor,
- interferenceDomain = interferenceDomain,
- listener = listener
- )
+ ): SimHypervisor = SimFairShareHypervisor(engine, listener, scalingGovernor, interferenceDomain)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 1b11ca6b..57d4cf20 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.flow.FlowCounters
/**
* A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
@@ -40,7 +39,22 @@ public interface SimHypervisor : SimWorkload {
/**
* The resource counters associated with the hypervisor.
*/
- public val counters: FlowCounters
+ public val counters: SimHypervisorCounters
+
+ /**
+ * The CPU usage of the hypervisor in MHz.
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The CPU usage of the hypervisor in MHz.
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU capacity of the hypervisor in MHz.
+ */
+ public val cpuCapacity: Double
/**
* Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
@@ -53,23 +67,5 @@ public interface SimHypervisor : SimWorkload {
* @param model The machine to create.
* @param interferenceId An identifier for the interference model.
*/
- public fun createMachine(model: MachineModel, interferenceId: String? = null): SimMachine
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
+ public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt
new file mode 100644
index 00000000..030d9c5f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel
+
+/**
+ * Performance counters of a [SimHypervisor].
+ */
+public interface SimHypervisorCounters {
+ /**
+ * The amount of time (in milliseconds) the CPUs of the hypervisor were actively running.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The amount of time (in milliseconds) the CPUs of the hypervisor were idle.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The amount of CPU time (in milliseconds) that virtual machines were ready to run, but were not able to.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The amount of CPU time (in milliseconds) that was lost due to interference between virtual machines.
+ */
+ public val cpuLostTime: Long
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
index dad2cc3b..483217af 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
@@ -40,13 +40,12 @@ public interface SimHypervisorProvider {
public val id: String
/**
- * Create a [SimHypervisor] instance with the specified [listener].
+ * Create a new [SimHypervisor] instance.
*/
public fun create(
engine: FlowEngine,
- parent: FlowConvergenceListener? = null,
+ listener: FlowConvergenceListener? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
- listener: SimHypervisor.Listener? = null
): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
index 883e0d82..82f8df38 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
@@ -22,8 +22,9 @@
package org.opendc.simulator.compute.kernel
-import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.flow.FlowConvergenceListener
import org.opendc.simulator.flow.FlowEngine
import org.opendc.simulator.flow.mux.FlowMultiplexer
import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
@@ -31,12 +32,14 @@ import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
-public class SimSpaceSharedHypervisor(engine: FlowEngine) : SimAbstractHypervisor(engine) {
- override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean {
- return switch.outputs.size - switch.inputs.size >= model.cpus.size
- }
+public class SimSpaceSharedHypervisor(
+ engine: FlowEngine,
+ listener: FlowConvergenceListener?,
+ scalingGovernor: ScalingGovernor?,
+) : SimAbstractHypervisor(engine, listener, scalingGovernor) {
+ override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine)
- override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
- return ForwardingFlowMultiplexer(engine)
+ override fun canFit(model: MachineModel): Boolean {
+ return mux.outputs.size - mux.inputs.size >= model.cpus.size
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
index 93921eb9..dd6fb0b1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
@@ -35,9 +35,8 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override fun create(
engine: FlowEngine,
- parent: FlowConvergenceListener?,
+ listener: FlowConvergenceListener?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
- listener: SimHypervisor.Listener?
- ): SimHypervisor = SimSpaceSharedHypervisor(engine)
+ ): SimHypervisor = SimSpaceSharedHypervisor(engine, listener, scalingGovernor)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt
new file mode 100644
index 00000000..36219ef2
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimVirtualMachine.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel
+
+import org.opendc.simulator.compute.SimMachine
+
+/**
+ * A virtual [SimMachine] running on top of another [SimMachine].
+ */
+public interface SimVirtualMachine : SimMachine {
+ /**
+ * The resource counters associated with the virtual machine.
+ */
+ public val counters: SimHypervisorCounters
+
+ /**
+ * The CPU usage of the VM in MHz.
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The CPU usage of the VM in MHz.
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU capacity of the VM in MHz.
+ */
+ public val cpuCapacity: Double
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 058d5d28..9db2e6ec 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -46,7 +46,7 @@ import org.opendc.simulator.flow.FlowEngine
* Test suite for the [SimHypervisor] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-internal class SimHypervisorTest {
+internal class SimFairShareHypervisorTest {
private lateinit var model: MachineModel
@BeforeEach
@@ -63,26 +63,6 @@ internal class SimHypervisorTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val listener = object : SimHypervisor.Listener {
- var totalRequestedWork = 0.0
- var totalGrantedWork = 0.0
- var totalOvercommittedWork = 0.0
-
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += totalWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
@@ -96,7 +76,7 @@ internal class SimHypervisorTest {
val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null)
launch {
machine.run(hypervisor)
@@ -111,9 +91,9 @@ internal class SimHypervisorTest {
machine.close()
assertAll(
- { assertEquals(1113300.0, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1023300.0, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(90000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") },
+ { assertEquals(880219, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
+ { assertEquals(28125, hypervisor.counters.cpuStealTime, "Steal time does not match") },
{ assertEquals(1200000, clock.millis()) { "Current time is correct" } }
)
}
@@ -123,26 +103,6 @@ internal class SimHypervisorTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val listener = object : SimHypervisor.Listener {
- var totalRequestedWork = 0.0
- var totalGrantedWork = 0.0
- var totalOvercommittedWork = 0.0
-
- override fun onSliceFinish(
- hypervisor: SimHypervisor,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += totalWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
@@ -167,7 +127,7 @@ internal class SimHypervisorTest {
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(platform, listener = listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, null, null)
launch {
machine.run(hypervisor)
@@ -189,9 +149,9 @@ internal class SimHypervisorTest {
yield()
assertAll(
- { assertEquals(2073600.0, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1053600.0, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(1020000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(329250, hypervisor.counters.cpuActiveTime, "Active time does not match") },
+ { assertEquals(870750, hypervisor.counters.cpuIdleTime, "Idle time does not match") },
+ { assertEquals(318750, hypervisor.counters.cpuStealTime, "Steal time does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -205,10 +165,8 @@ internal class SimHypervisorTest {
)
val platform = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimFairShareHypervisor(platform)
+ val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimFairShareHypervisor(platform, null, null, null)
assertDoesNotThrow {
launch {
@@ -238,7 +196,7 @@ internal class SimHypervisorTest {
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(platform, interferenceDomain = interferenceModel.newDomain())
+ val hypervisor = SimFairShareHypervisor(platform, null, null, interferenceModel.newDomain())
val duration = 5 * 60L
val workloadA =
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 95fb6679..b05ffd22 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -75,10 +75,8 @@ internal class SimSpaceSharedHypervisorTest {
)
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
val vm = hypervisor.createMachine(machineModel)
@@ -99,10 +97,8 @@ internal class SimSpaceSharedHypervisorTest {
val duration = 5 * 60L * 1000
val workload = SimRuntimeWorkload(duration)
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -125,7 +121,7 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -146,7 +142,7 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -172,7 +168,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testConcurrentWorkloadFails() = runBlockingSimulation {
val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimSpaceSharedHypervisor(engine)
+ val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
launch { machine.run(hypervisor) }
yield()
@@ -196,7 +192,7 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(
interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null)
launch { machine.run(hypervisor) }
yield()
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index 4834f10f..e927f81d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -83,8 +83,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -96,8 +96,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(3) {
launch {
@@ -113,8 +113,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -126,8 +126,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(2) {
launch {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
index c8092082..b02426e3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
/**
* The previous demand for the consumer.
*/
- private var previousDemand = 0.0
+ private var _previousDemand = 0.0
+ private var _previousCapacity = 0.0
/**
* Update the counters of the flow consumer.
*/
protected fun updateCounters(ctx: FlowConnection, delta: Long) {
- val demand = previousDemand
- previousDemand = ctx.demand
+ val demand = _previousDemand
+ val capacity = _previousCapacity
+
+ _previousDemand = ctx.demand
+ _previousCapacity = ctx.capacity
if (delta <= 0) {
return
@@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
val counters = _counters
val deltaS = delta / 1000.0
- val work = demand * deltaS
+ val total = demand * deltaS
+ val work = capacity * deltaS
val actualWork = ctx.rate * deltaS
- val remainingWork = work - actualWork
counters.demand += work
counters.actual += actualWork
- counters.overcommit += remainingWork
+ counters.remaining += (total - actualWork)
}
/**
* Update the counters of the flow consumer.
*/
- protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
+ protected fun updateCounters(demand: Double, actual: Double, remaining: Double) {
val counters = _counters
counters.demand += demand
counters.actual += actual
- counters.overcommit += overcommit
+ counters.remaining += remaining
}
final override fun startConsumer(source: FlowSource) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
index e15d7643..a717ae6e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -37,9 +37,9 @@ public interface FlowCounters {
public val actual: Double
/**
- * The accumulated flow that could not be transferred over the connection.
+ * The amount of capacity that was not utilized.
*/
- public val overcommit: Double
+ public val remaining: Double
/**
* The accumulated flow lost due to interference between sources.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 17de601a..7eaaf6c2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
val counters = _counters
val deltaS = delta / 1000.0
+ val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
counters.demand += work
counters.actual += actualWork
- counters.overcommit += (work - actualWork)
+ counters.remaining += (total - actualWork)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
index 141d335d..d2fa5228 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
@@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters
internal class FlowCountersImpl : FlowCounters {
override var demand: Double = 0.0
override var actual: Double = 0.0
- override var overcommit: Double = 0.0
+ override var remaining: Double = 0.0
override var interference: Double = 0.0
override fun reset() {
demand = 0.0
actual = 0.0
- overcommit = 0.0
+ remaining = 0.0
interference = 0.0
}
override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 17b82391..04ba7f21 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -39,7 +39,22 @@ public interface FlowMultiplexer {
/**
* The outputs of the multiplexer over which the flows will be distributed.
*/
- public val outputs: Set<FlowConsumer>
+ public val outputs: Set<FlowSource>
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ public val rate: Double
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ public val demand: Double
+
+ /**
+ * The capacity of the outputs.
+ */
+ public val capacity: Double
/**
* The flow counters to track the flow metrics of all multiplexer inputs.
@@ -59,12 +74,27 @@ public interface FlowMultiplexer {
public fun removeInput(input: FlowConsumer)
/**
- * Add the specified [output] to the multiplexer.
+ * Create a new output on this multiplexer.
*/
- public fun addOutput(output: FlowConsumer)
+ public fun newOutput(): FlowSource
/**
- * Clear all inputs and outputs from the switch.
+ * Remove [output] from this multiplexer.
+ */
+ public fun removeOutput(output: FlowSource)
+
+ /**
+ * Clear all inputs and outputs from the multiplexer.
*/
public fun clear()
+
+ /**
+ * Clear the inputs of the multiplexer.
+ */
+ public fun clearInputs()
+
+ /**
+ * Clear the outputs of the multiplexer.
+ */
+ public fun clearOutputs()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 6dd9dcfb..125d10fe 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
get() = _inputs
private val _inputs = mutableSetOf<Input>()
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
- private val _availableOutputs = ArrayDeque<FlowForwarder>()
+ private val _outputs = mutableSetOf<Output>()
+ private val _availableOutputs = ArrayDeque<Output>()
override val counters: FlowCounters = object : FlowCounters {
override val demand: Double
- get() = _outputs.sumOf { it.counters.demand }
+ get() = _outputs.sumOf { it.forwarder.counters.demand }
override val actual: Double
- get() = _outputs.sumOf { it.counters.actual }
- override val overcommit: Double
- get() = _outputs.sumOf { it.counters.overcommit }
+ get() = _outputs.sumOf { it.forwarder.counters.actual }
+ override val remaining: Double
+ get() = _outputs.sumOf { it.forwarder.counters.remaining }
override val interference: Double
- get() = _outputs.sumOf { it.counters.interference }
+ get() = _outputs.sumOf { it.forwarder.counters.interference }
override fun reset() {
- for (input in _outputs) {
- input.counters.reset()
+ for (output in _outputs) {
+ output.forwarder.counters.reset()
}
}
- override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
}
+ override val rate: Double
+ get() = _outputs.sumOf { it.forwarder.rate }
+
+ override val demand: Double
+ get() = _outputs.sumOf { it.forwarder.demand }
+
+ override val capacity: Double
+ get() = _outputs.sumOf { it.forwarder.capacity }
+
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
- val output = Input(forwarder)
- _inputs += output
- return output
+ val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val input = Input(output)
+ _inputs += input
+ return input
}
override fun removeInput(input: FlowConsumer) {
@@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
return
}
- (input as Input).close()
+ val output = (input as Input).output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- override fun addOutput(output: FlowConsumer) {
- if (output in outputs) {
- return
- }
-
+ override fun newOutput(): FlowSource {
val forwarder = FlowForwarder(engine)
+ val output = Output(forwarder)
_outputs += output
- _availableOutputs += forwarder
+ return output
+ }
- output.startConsumer(object : FlowSource by forwarder {
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _outputs -= output
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
- forwarder.onStop(conn, now, delta)
- }
- })
+ val forwarder = (output as Output).forwarder
+ forwarder.close()
}
- override fun clear() {
- for (input in _outputs) {
- input.cancel()
+ override fun clearInputs() {
+ for (input in _inputs) {
+ val output = input.output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- _outputs.clear()
- // Inputs are implicitly cancelled by the output forwarders
_inputs.clear()
}
+ override fun clearOutputs() {
+ for (output in _outputs) {
+ output.forwarder.cancel()
+ }
+ _outputs.clear()
+ _availableOutputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
+ }
+
/**
* An input on the multiplexer.
*/
- private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder {
- /**
- * Close the input.
- */
- fun close() {
- // We explicitly do not close the forwarder here in order to re-use it across input resources.
- _inputs -= this
- _availableOutputs += forwarder
+ private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+
+ /**
+ * An output on the multiplexer.
+ */
+ private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _availableOutputs += this
+ forwarder.onStart(conn, now)
}
- override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ forwarder.cancel()
+ forwarder.onStop(conn, now, delta)
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Output"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 7232df35..5ff0fb8d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer(
/**
* The outputs of the multiplexer.
*/
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _outputs = mutableSetOf<Output>()
private val _activeOutputs = mutableListOf<Output>()
/**
@@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer(
/**
* The actual processing rate of the multiplexer.
*/
+ public override val rate: Double
+ get() = _rate
private var _rate = 0.0
/**
* The demanded processing rate of the input.
*/
+ public override val demand: Double
+ get() = _demand
private var _demand = 0.0
/**
* The capacity of the outputs.
*/
+ public override val capacity: Double
+ get() = _capacity
private var _capacity = 0.0
/**
* Flag to indicate that the scheduler is active.
*/
private var _schedulerActive = false
+ private var _lastSchedulerCycle = Long.MAX_VALUE
+
+ /**
+ * The last convergence timestamp and the input.
+ */
+ private var _lastConverge: Long = Long.MIN_VALUE
+ private var _lastConvergeInput: Input? = null
override fun newInput(key: InterferenceKey?): FlowConsumer {
val provider = Input(_capacity, key)
@@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer(
return provider
}
- override fun addOutput(output: FlowConsumer) {
- val consumer = Output(output)
- if (_outputs.add(output)) {
- _activeOutputs.add(consumer)
- output.startConsumer(consumer)
- }
- }
-
override fun removeInput(input: FlowConsumer) {
if (!_inputs.remove(input)) {
return
@@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer(
(input as Input).close()
}
- override fun clear() {
- for (input in _activeOutputs) {
+ override fun newOutput(): FlowSource {
+ val output = Output()
+ _outputs.add(output)
+ return output
+ }
+
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+
+ // This cast should always succeed since only `Output` instances should be added to `_outputs`
+ (output as Output).cancel()
+ }
+
+ override fun clearInputs() {
+ for (input in _inputs) {
input.cancel()
}
- _activeOutputs.clear()
+ _inputs.clear()
+ }
- for (output in _activeInputs) {
+ override fun clearOutputs() {
+ for (output in _outputs) {
output.cancel()
}
- _activeInputs.clear()
+ _outputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
}
/**
@@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer(
if (_schedulerActive) {
return
}
-
+ val lastSchedulerCycle = _lastSchedulerCycle
+ val delta = max(0, now - lastSchedulerCycle)
_schedulerActive = true
+ _lastSchedulerCycle = now
+
try {
- doSchedule(now)
+ doSchedule(now, delta)
} finally {
_schedulerActive = false
}
@@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer(
/**
* Schedule the inputs over the outputs.
*/
- private fun doSchedule(now: Long) {
+ private fun doSchedule(now: Long, delta: Long) {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
+ // Update the counters of the scheduler
+ updateCounters(delta)
+
// If there is no work yet, mark the inputs as idle.
if (activeInputs.isEmpty()) {
+ _demand = 0.0
+ _rate = 0.0
return
}
@@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer(
// Remove outputs that have finished
if (!input.isActive) {
+ input.actualRate = 0.0
inputIterator.remove()
}
}
@@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer(
// Divide the available output capacity fairly over the inputs using max-min fair sharing
var remaining = activeInputs.size
- for (input in activeInputs) {
+ for (i in activeInputs.indices) {
+ val input = activeInputs[i]
val availableShare = availableCapacity / remaining--
val grantedRate = min(input.allowedRate, availableShare)
@@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer(
activeOutputs.sort()
// Divide the requests over the available capacity of the input resources fairly
- for (output in activeOutputs) {
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
val inputCapacity = output.capacity
val fraction = inputCapacity / capacity
val grantedSpeed = rate * fraction
@@ -220,6 +258,29 @@ public class MaxMinFlowMultiplexer(
}
/**
+ * The previous capacity of the multiplexer.
+ */
+ private var _previousCapacity = 0.0
+
+ /**
+ * Update the counters of the scheduler.
+ */
+ private fun updateCounters(delta: Long) {
+ val previousCapacity = _previousCapacity
+ _previousCapacity = _capacity
+
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta / 1000.0
+
+ _counters.demand += _demand * deltaS
+ _counters.actual += _rate * deltaS
+ _counters.remaining += (previousCapacity - _rate) * deltaS
+ }
+
+ /**
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private inner class Input(capacity: Double, val key: InterferenceKey?) :
@@ -253,6 +314,11 @@ public class MaxMinFlowMultiplexer(
private var _lastPull: Long = Long.MIN_VALUE
/**
+ * The interference domain this input belongs to.
+ */
+ private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain
+
+ /**
* Close the input.
*
* This method is invoked when the user removes an input from the switch.
@@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer(
check(!_isClosed) { "Cannot re-use closed input" }
_activeInputs += this
-
if (parent != null) {
ctx.shouldConsumerConverge = true
}
@@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer(
doUpdateCounters(delta)
actualRate = 0.0
- this.limit = rate
+ limit = rate
_lastPull = now
runScheduler(now)
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ val lastConverge = _lastConverge
+ val parent = parent
+
+ if (parent != null && (lastConverge < now || _lastConvergeInput == null)) {
+ _lastConverge = now
+ _lastConvergeInput = this
+
+ parent.onConverge(now, max(0, now - lastConverge))
+ }
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
@@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
_lastPull = now
+
+ // Assign a new input responsible for handling the convergence events
+ if (_lastConvergeInput == this) {
+ _lastConvergeInput = null
+ }
+
+ // Re-run scheduler to distribute new load
+ runScheduler(now)
}
/* Comparable */
@@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer(
// Compute the performance penalty due to flow interference
val perfScore = if (interferenceDomain != null) {
- val load = _rate / capacity
+ val load = _rate / _capacity
interferenceDomain.apply(key, load)
} else {
1.0
}
val deltaS = delta / 1000.0
- val work = limit * deltaS
- val actualWork = actualRate * deltaS
- val remainingWork = work - actualWork
+ val demand = limit * deltaS
+ val actual = actualRate * deltaS
+ val remaining = (capacity - actualRate) * deltaS
- updateCounters(work, actualWork, remainingWork)
+ updateCounters(demand, actual, remaining)
- val distCounters = _counters
- distCounters.demand += work
- distCounters.actual += actualWork
- distCounters.overcommit += remainingWork
- distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ _counters.interference += actual * max(0.0, 1 - perfScore)
}
}
/**
* An internal [FlowSource] implementation for multiplexer outputs.
*/
- private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> {
+ private inner class Output : FlowSource, Comparable<Output> {
/**
* The active [FlowConnection] of this source.
*/
- private var _ctx: FlowConnection? = null
+ private var _conn: FlowConnection? = null
/**
* The capacity of this output.
@@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer(
* Push the specified rate to the consumer.
*/
fun push(rate: Double) {
- _ctx?.push(rate)
+ _conn?.push(rate)
}
/**
* Cancel this output.
*/
fun cancel() {
- provider.cancel()
+ _conn?.close()
}
override fun onStart(conn: FlowConnection, now: Long) {
- assert(_ctx == null) { "Source running concurrently" }
- _ctx = conn
+ assert(_conn == null) { "Source running concurrently" }
+ _conn = conn
capacity = conn.capacity
+ _activeOutputs.add(this)
+
updateCapacity()
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _ctx = null
+ _conn = null
capacity = 0.0
+ _activeOutputs.remove(this)
+
updateCapacity()
+
+ runScheduler(now)
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
@@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer(
updateCapacity()
}
+ // Re-run scheduler to distribute new load
runScheduler(now)
return Long.MAX_VALUE
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index d548451f..12e72b8f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -217,7 +217,7 @@ internal class FlowForwarderTest {
assertEquals(2.0, source.counters.actual)
assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
- assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
assertEquals(2000, clock.millis())
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
index 3475f027..187dacd9 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource
/**
* Test suite for the [ForwardingFlowMultiplexer] class.
*/
-internal class ExclusiveFlowMultiplexerTest {
+internal class ForwardingFlowMultiplexerTest {
/**
* Test a trace workload.
*/
@@ -63,7 +63,7 @@ internal class ExclusiveFlowMultiplexerTest {
val forwarder = FlowForwarder(engine)
val adapter = FlowSourceRateAdapter(forwarder, speed::add)
source.startConsumer(adapter)
- switch.addOutput(forwarder)
+ forwarder.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -88,7 +88,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -127,7 +127,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -146,7 +146,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
switch.newInput()
assertThrows<IllegalStateException> { switch.newInput() }
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
index 9f6b8a2c..6e2cdb98 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
@@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest {
val switch = MaxMinFlowMultiplexer(scheduler)
val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { switch.addOutput(it) }
+ sources.forEach { it.startConsumer(switch.newOutput()) }
val provider = switch.newInput()
val consumer = FixedFlowSource(2000.0, 1.0)
@@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val provider = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
provider.consume(workload)
yield()
} finally {
@@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val providerA = switch.newInput()
val providerB = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
coroutineScope {
launch { providerA.consume(workloadA) }
@@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
index 2b7c1ad7..6667c80c 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
@@ -63,11 +63,9 @@ public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetwor
get() = _provider
private val _provider = mux.newInput()
- override fun createConsumer(): FlowSource {
- val forwarder = FlowForwarder(engine, isCoupled = true)
- mux.addOutput(forwarder)
- return forwarder
- }
+ private val _source = mux.newOutput()
+
+ override fun createConsumer(): FlowSource = _source
override fun close() {
isClosed = true
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index d536f22d..9f88fecc 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -46,18 +46,14 @@ public class SimPdu(
/**
* The [FlowForwarder] that represents the input of the PDU.
*/
- private val forwarder = FlowForwarder(engine)
+ private val output = mux.newOutput()
/**
* Create a new PDU outlet.
*/
public fun newOutlet(): Outlet = Outlet(mux, mux.newInput())
- init {
- mux.addOutput(forwarder)
- }
-
- override fun createSource(): FlowSource = FlowMapper(forwarder) { _, rate ->
+ override fun createSource(): FlowSource = FlowMapper(output) { _, rate ->
val loss = computePowerLoss(rate)
rate + loss
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
index 312f1d0f..46d659f8 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
@@ -42,19 +42,19 @@ public class SimUps(
/**
* The resource aggregator used to combine the input sources.
*/
- private val switch = MaxMinFlowMultiplexer(engine)
+ private val mux = MaxMinFlowMultiplexer(engine)
/**
* The [FlowConsumer] that represents the output of the UPS.
*/
- private val provider = switch.newInput()
+ private val provider = mux.newInput()
/**
* Create a new UPS outlet.
*/
public fun newInlet(): SimPowerInlet {
val forward = FlowForwarder(engine, isCoupled = true)
- switch.addOutput(forward)
+ forward.startConsumer(mux.newOutput())
return Inlet(forward)
}