summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 18:15:09 +0200
committerGitHub <noreply@github.com>2021-10-03 18:15:09 +0200
commitb92d0e8703014f143ff0b1fe67de09fff6f867b1 (patch)
tree34238f56af20f0eb697f25ad5a700bab7fa4d6fb /opendc-experiments
parent54bccf522e169d5cba6489291217f3307ae71094 (diff)
parent012fe8fa9be1676b8eef0cce795738a00c4260c0 (diff)
merge: Migrate to flow-based simulation for low-level models
This pull request converts the `opendc-simulator-resources` module into a flow simulator and adapts the existing low-level models (e.g., CPU, network, disk) to this new flow simulator. The flow simulator works differently from the uniform resource consumption model, in that it models flow through a system of connections, as opposed to resource consumptions. Concretely, this means that while in the uniform resource consumption model, consumptions with the same usage are propagated to the resources, in the flow simulator, only changes to the flow in the system are propagated. Overall, this leads to less updates in the system and therefore higher performance. The benchmarks shows that the new implementation obtains more than double the performance of the old implementation. We have focused in the new implementation on reducing the amount of work and memory allocations/loads/stores per updates. * Migrate from kotlinx-benchmark to jmh-gradle (for better profiling support) * Use longer traces for benchmarks (to prevent measuring the benchmark overhead) * Use direct field access for perf-sensitive code * Combine work and deadline to duration * Add support for pushing flow from context (to eliminate the allocation for every `SimResourceCommand`) * Reduce memory allocations in SimResourceInterpreter, by revamping the way timers are allocated. * Simplify max-min aggregator implementation (by utilizing the new push mechanism) * Invoke consumer callback on every invalidation (in order to propagate changes downstream) * Lazily push changes to resource context (by not updating the flow rate immediately after a push, but only after an update) * Remove onUpdate callback * Merge distributor and aggregator into switch * Separate push and pull flags * Remove failure callback from FlowSource * Create separate callbacks for remaining events * Make convergence callback optional * Reduce field accesses in FlowConsumerContextImpl * Optimize hot path in SimTraceWorkload * Expose CPU time counters directly on hypervisor * Optimize telemetry collection **Breaking API Changes** * The entire `opendc-simulator-resources` module has been replaced by the `opendc-simulator-flow` module. * `SimHypervisor.Listener` has been removed in favour of a new interface that exposes the performance counters of the hypervisor directly. To listen for convergence, use `FlowConvergenceListener`.
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt25
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt56
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt2
4 files changed, 45 insertions, 40 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 21ff3ab0..4e855f82 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -131,7 +131,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
// Instantiate the desired topology
runner.apply(topology)
- // Run the workload trace
+ // Converge the workload trace
runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
runner.close()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 30cc1466..9d540118 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -116,11 +116,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(5.840207707767459E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -160,10 +160,11 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10998110, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9740290, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.009945802750012E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -209,10 +210,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(480866, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -252,8 +253,8 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(11134319, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9604081, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
{ assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 0873aac9..fb36d2c7 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -35,12 +35,13 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
import java.time.Clock
import java.util.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
+import kotlin.math.roundToLong
/**
* A [TFDevice] implementation using simulated components.
@@ -64,7 +65,7 @@ public class SimTFDevice(
* The [SimMachine] representing the device.
*/
private val machine = SimBareMetalMachine(
- SimResourceInterpreter(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)),
+ FlowEngine(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)),
SimplePowerDriver(powerModel)
)
@@ -94,11 +95,11 @@ public class SimTFDevice(
/**
* The workload that will be run by the device.
*/
- private val workload = object : SimWorkload, SimResourceConsumer {
+ private val workload = object : SimWorkload, FlowSource {
/**
* The resource context to interrupt the workload with.
*/
- var ctx: SimResourceContext? = null
+ var ctx: FlowConnection? = null
/**
* The capacity of the device.
@@ -127,13 +128,26 @@ public class SimTFDevice(
}
}
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ ctx = conn
+ capacity = conn.capacity
+
+ conn.shouldSourceConverge = true
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val consumedWork = conn.rate * delta / 1000.0
+
+ capacity = conn.capacity
+
val activeWork = activeWork
if (activeWork != null) {
- if (activeWork.consume(activeWork.flops - ctx.remainingWork)) {
+ if (activeWork.consume(consumedWork)) {
this.activeWork = null
} else {
- return SimResourceCommand.Consume(activeWork.flops, ctx.capacity)
+ val duration = (activeWork.flops / conn.capacity * 1000).roundToLong()
+ conn.push(conn.capacity)
+ return duration
}
}
@@ -141,28 +155,18 @@ public class SimTFDevice(
val head = queue.poll()
return if (head != null) {
this.activeWork = head
- SimResourceCommand.Consume(head.flops, ctx.capacity)
+ val duration = (head.flops / conn.capacity * 1000).roundToLong()
+ conn.push(conn.capacity)
+ duration
} else {
- SimResourceCommand.Idle()
+ conn.push(0.0)
+ Long.MAX_VALUE
}
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- this.capacity = ctx.capacity
- }
- SimResourceEvent.Capacity -> {
- this.capacity = ctx.capacity
- ctx.interrupt()
- }
- SimResourceEvent.Run -> {
- _usage.record(ctx.speed)
- _power.record(machine.psu.powerDraw)
- }
- else -> {}
- }
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ _usage.record(conn.rate)
+ _power.record(machine.psu.powerDraw)
}
}
@@ -180,7 +184,7 @@ public class SimTFDevice(
override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont ->
workload.queue.add(Work(flops, cont))
if (workload.isIdle) {
- workload.ctx?.interrupt()
+ workload.ctx?.pull()
}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
index 5839c0df..3e755b56 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
@@ -27,7 +27,7 @@ package org.opendc.experiments.tf20.distribute
*/
public interface Strategy {
/**
- * Run the specified batch using the given strategy.
+ * Converge the specified batch using the given strategy.
*/
public suspend fun run(forward: Double, backward: Double, batchSize: Int)
}