diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 18:15:09 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-10-03 18:15:09 +0200 |
| commit | b92d0e8703014f143ff0b1fe67de09fff6f867b1 (patch) | |
| tree | 34238f56af20f0eb697f25ad5a700bab7fa4d6fb /opendc-experiments | |
| parent | 54bccf522e169d5cba6489291217f3307ae71094 (diff) | |
| parent | 012fe8fa9be1676b8eef0cce795738a00c4260c0 (diff) | |
merge: Migrate to flow-based simulation for low-level models
This pull request converts the `opendc-simulator-resources` module into a flow
simulator and adapts the existing low-level models (e.g., CPU, network, disk)
to this new flow simulator.
The flow simulator works differently from the uniform resource consumption
model, in that it models flow through a system of connections, as opposed to
resource consumptions. Concretely, this means that while in the uniform
resource consumption model, consumptions with the same usage are propagated to
the resources, in the flow simulator, only changes to the flow in the system
are propagated.
Overall, this leads to less updates in the system and therefore higher
performance. The benchmarks shows that the new implementation obtains more than
double the performance of the old implementation. We have focused in the new
implementation on reducing the amount of work and memory
allocations/loads/stores per updates.
* Migrate from kotlinx-benchmark to jmh-gradle (for better profiling support)
* Use longer traces for benchmarks (to prevent measuring the benchmark
overhead)
* Use direct field access for perf-sensitive code
* Combine work and deadline to duration
* Add support for pushing flow from context (to eliminate the allocation for
every `SimResourceCommand`)
* Reduce memory allocations in SimResourceInterpreter, by revamping the way
timers are allocated.
* Simplify max-min aggregator implementation (by utilizing the new push
mechanism)
* Invoke consumer callback on every invalidation (in order to propagate changes
downstream)
* Lazily push changes to resource context (by not updating the flow rate
immediately after a push, but only after an update)
* Remove onUpdate callback
* Merge distributor and aggregator into switch
* Separate push and pull flags
* Remove failure callback from FlowSource
* Create separate callbacks for remaining events
* Make convergence callback optional
* Reduce field accesses in FlowConsumerContextImpl
* Optimize hot path in SimTraceWorkload
* Expose CPU time counters directly on hypervisor
* Optimize telemetry collection
**Breaking API Changes**
* The entire `opendc-simulator-resources` module has been replaced by the
`opendc-simulator-flow` module.
* `SimHypervisor.Listener` has been removed in favour of a new interface that
exposes the performance counters of the hypervisor directly. To listen for
convergence, use `FlowConvergenceListener`.
Diffstat (limited to 'opendc-experiments')
4 files changed, 45 insertions, 40 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 21ff3ab0..4e855f82 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -131,7 +131,7 @@ abstract class Portfolio(name: String) : Experiment(name) { // Instantiate the desired topology runner.apply(topology) - // Run the workload trace + // Converge the workload trace runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { runner.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 30cc1466..9d540118 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -116,11 +116,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.840207707767459E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -160,10 +160,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10998110, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740290, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.009945802750012E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -209,10 +210,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(480866, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -252,8 +253,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(11134319, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9604081, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 0873aac9..fb36d2c7 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -35,12 +35,13 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* import java.time.Clock import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume +import kotlin.math.roundToLong /** * A [TFDevice] implementation using simulated components. @@ -64,7 +65,7 @@ public class SimTFDevice( * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine( - SimResourceInterpreter(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)), + FlowEngine(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)), SimplePowerDriver(powerModel) ) @@ -94,11 +95,11 @@ public class SimTFDevice( /** * The workload that will be run by the device. */ - private val workload = object : SimWorkload, SimResourceConsumer { + private val workload = object : SimWorkload, FlowSource { /** * The resource context to interrupt the workload with. */ - var ctx: SimResourceContext? = null + var ctx: FlowConnection? = null /** * The capacity of the device. @@ -127,13 +128,26 @@ public class SimTFDevice( } } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onStart(conn: FlowConnection, now: Long) { + ctx = conn + capacity = conn.capacity + + conn.shouldSourceConverge = true + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumedWork = conn.rate * delta / 1000.0 + + capacity = conn.capacity + val activeWork = activeWork if (activeWork != null) { - if (activeWork.consume(activeWork.flops - ctx.remainingWork)) { + if (activeWork.consume(consumedWork)) { this.activeWork = null } else { - return SimResourceCommand.Consume(activeWork.flops, ctx.capacity) + val duration = (activeWork.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) + return duration } } @@ -141,28 +155,18 @@ public class SimTFDevice( val head = queue.poll() return if (head != null) { this.activeWork = head - SimResourceCommand.Consume(head.flops, ctx.capacity) + val duration = (head.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) + duration } else { - SimResourceCommand.Idle() + conn.push(0.0) + Long.MAX_VALUE } } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - this.capacity = ctx.capacity - } - SimResourceEvent.Capacity -> { - this.capacity = ctx.capacity - ctx.interrupt() - } - SimResourceEvent.Run -> { - _usage.record(ctx.speed) - _power.record(machine.psu.powerDraw) - } - else -> {} - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + _usage.record(conn.rate) + _power.record(machine.psu.powerDraw) } } @@ -180,7 +184,7 @@ public class SimTFDevice( override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont -> workload.queue.add(Work(flops, cont)) if (workload.isIdle) { - workload.ctx?.interrupt() + workload.ctx?.pull() } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt index 5839c0df..3e755b56 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt @@ -27,7 +27,7 @@ package org.opendc.experiments.tf20.distribute */ public interface Strategy { /** - * Run the specified batch using the given strategy. + * Converge the specified batch using the given strategy. */ public suspend fun run(forward: Double, backward: Double, batchSize: Int) } |
