diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-21 22:32:05 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-21 22:32:05 +0200 |
| commit | fa7fdbb0126ea465130961dc37c4ef2d6feb36e9 (patch) | |
| tree | 9cd46dd7970870b78990d6c35e8e2759d7cf5a13 /opendc-compute | |
| parent | 29beb50018cf2ad87b252c6c080f8c5de4600349 (diff) | |
| parent | 290e1fe14460d91e4703e55ac5f05dbe7b4505f7 (diff) | |
merge: Implement multi-flow stages in simulator (#110)
This pull request introduces the new `flow2` multi-flow simulator into the OpenDC codebase
and adjust all existing modules to make use of this new simulator.
The new simulator models flow as a network of components, which can each receive flow
from (potentially) multiple other components. In the previous simulator, the framework itself
supported only single flows between components and required re-implementation of many
components to support multiplexing flows.
Initial benchmarks show performance improvements in the range 2x–4x for large scale experiments
such as the Capelin benchmarks.
## Implementation Notes :hammer_and_pick:
* Add support for multi-flow stages
* Support flow transformations
* Add forwarding flow multiplexer
* Expose metrics on FlowMultiplexer
* Re-implement network sim using flow2
* Re-implement power sim using flow2
* Re-implement compute sim using flow2
* Optimize workload implementation of SimTrace
* Remove old flow simulator
* Add log4j-core dependency
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* Removal of the `org.opendc.simulator.flow` package. You should now use
the new flow simulator located in `org.opendc.simulator.flow2`.
* `PowerModel` interface is replaced by the `CpuPowerModel` interface.
* `PowerDriver` interface is replaced by the `SimPsu` and `SimPsuFactory` interfaces.
* Removal of `SimTraceWorkload`. Instead, create a workload from a `SimTrace` using
`createWorkload(offset)`.
* `ScalingGovernor` has been split in a `ScalingGovernor` and `ScalingGovernorFactory`.
* All modules in `opendc-simulator` are now written in Java. This means that default
parameters are not supported anymore for these modules.
Diffstat (limited to 'opendc-compute')
4 files changed, 66 insertions, 56 deletions
diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index fd15b6e7..1a73201e 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -33,5 +33,6 @@ dependencies { implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) + testRuntimeOnly(libs.log4j.core) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 9969ac2b..c07649bd 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -41,8 +41,10 @@ import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import java.time.Clock +import org.opendc.simulator.flow2.FlowGraph import java.time.Duration import java.time.Instant import java.util.UUID @@ -56,13 +58,18 @@ public class SimHost( override val name: String, override val meta: Map<String, Any>, private val context: CoroutineContext, - private val clock: Clock, + graph: FlowGraph, private val machine: SimBareMetalMachine, private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), private val optimize: Boolean = false ) : Host, AutoCloseable { /** + * The clock instance used by the host. + */ + private val clock = graph.engine.clock + + /** * The event listeners registered with this host. */ private val listeners = mutableListOf<HostListener>() @@ -201,8 +208,8 @@ public class SimHost( Duration.ofMillis(_uptime), Duration.ofMillis(_downtime), _bootTime, - machine.powerUsage, - machine.energyUsage, + machine.psu.powerUsage, + machine.psu.energyUsage, terminated, running, error, @@ -217,7 +224,7 @@ public class SimHost( override fun getCpuStats(): HostCpuStats { val counters = hypervisor.counters - counters.flush() + counters.sync() return HostCpuStats( counters.cpuActiveTime / 1000L, @@ -277,27 +284,30 @@ public class SimHost( check(_ctx == null) { "Concurrent hypervisor running" } // Launch hypervisor onto machine - _ctx = machine.startWorkload(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - _bootTime = clock.instant() - _state = HostState.UP - hypervisor.onStart(ctx) - } catch (cause: Throwable) { - _state = HostState.ERROR - _ctx = null - throw cause + _ctx = machine.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + _bootTime = clock.instant() + _state = HostState.UP + hypervisor.onStart(ctx) + } catch (cause: Throwable) { + _state = HostState.ERROR + _ctx = null + throw cause + } } - } - override fun onStop(ctx: SimMachineContext) { - try { - hypervisor.onStop(ctx) - } finally { - _ctx = null + override fun onStop(ctx: SimMachineContext) { + try { + hypervisor.onStop(ctx) + } finally { + _ctx = null + } } - } - }) + }, + emptyMap() + ) } /** @@ -307,7 +317,7 @@ public class SimHost( updateUptime() // Stop the hypervisor - _ctx?.close() + _ctx?.shutdown() _state = state } @@ -316,9 +326,10 @@ public class SimHost( */ private fun Flavor.toMachineModel(): MachineModel { val originalCpu = machine.model.cpus[0] + val originalNode = originalCpu.node val cpuCapacity = (this.meta["cpu-capacity"] as? Double ?: Double.MAX_VALUE).coerceAtMost(originalCpu.frequency) - val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) } + val processingNode = ProcessingNode(originalNode.vendor, originalNode.modelName, originalNode.architecture, cpuCount) + val processingUnits = (0 until cpuCount).map { ProcessingUnit(processingNode, it, cpuCapacity) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) val model = MachineModel(processingUnits, memoryUnits) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 6b74fa3a..790d8047 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -161,7 +161,7 @@ internal class Guest( */ fun getCpuStats(): GuestCpuStats { val counters = machine.counters - counters.flush() + counters.sync() return GuestCpuStats( counters.cpuActiveTime / 1000L, diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 6be1f3c0..a5999bcd 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -43,13 +43,10 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.mux.FlowMultiplexerFactory +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory import org.opendc.simulator.kotlin.runSimulation import java.time.Instant import java.util.SplittableRandom @@ -67,8 +64,8 @@ internal class SimHostTest { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = MachineModel( - cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, - memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) } @@ -78,15 +75,19 @@ internal class SimHostTest { @Test fun testOvercommitted() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) + + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) + val host = SimHost( uid = UUID.randomUUID(), name = "test", meta = emptyMap(), coroutineContext, - clock, + graph, machine, hypervisor ) @@ -95,15 +96,13 @@ internal class SimHostTest { "<unnamed>", emptyMap(), mapOf( - "workload" to SimTraceWorkload( + "workload" to SimTrace.ofFragments( SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) - ), - offset = 1 - ) + ).createWorkload(1) ) ) val vmImageB = MockImage( @@ -111,15 +110,13 @@ internal class SimHostTest { "<unnamed>", emptyMap(), mapOf( - "workload" to SimTraceWorkload( + "workload" to SimTrace.ofFragments( SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2) - ), - offset = 1 - ) + ).createWorkload(1) ) ) @@ -129,7 +126,7 @@ internal class SimHostTest { launch { host.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } launch { host.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } - suspendCancellableCoroutine<Unit> { cont -> + suspendCancellableCoroutine { cont -> host.addListener(object : HostListener { private var finished = 0 @@ -162,15 +159,18 @@ internal class SimHostTest { @Test fun testFailure() = runSimulation { val duration = 5 * 60L - val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) + + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val hypervisor = SimHypervisor.create(FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1)) val host = SimHost( uid = UUID.randomUUID(), name = "test", meta = emptyMap(), coroutineContext, - clock, + graph, machine, hypervisor ) @@ -179,15 +179,13 @@ internal class SimHostTest { "<unnamed>", emptyMap(), mapOf( - "workload" to SimTraceWorkload( + "workload" to SimTrace.ofFragments( SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) - ), - offset = 1 - ) + ).createWorkload(1) ) ) val flavor = MockFlavor(2, 0) @@ -220,7 +218,7 @@ internal class SimHostTest { val guestSysStats = host.getSystemStats(server) assertAll( - { assertEquals(1775, cpuStats.idleTime, "Idle time does not match") }, + { assertEquals(1175, cpuStats.idleTime, "Idle time does not match") }, { assertEquals(624, cpuStats.activeTime, "Active time does not match") }, { assertEquals(900001, sysStats.uptime.toMillis(), "Uptime does not match") }, { assertEquals(300000, sysStats.downtime.toMillis(), "Downtime does not match") }, |
