diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-01 14:38:34 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-21 22:13:04 +0200 |
| commit | 44215bd668c5fa3efe2f57fc577824478b00af57 (patch) | |
| tree | b933228e5e5748716351dc9ce031b4840f254428 /opendc-experiments/opendc-experiments-tf20/src | |
| parent | c1f67a872e2d7ce63ac96f8ca80cbe8b25c62e3b (diff) | |
refactor(sim/compute): Re-implement using flow2
This change re-implements the OpenDC compute simulator framework using
the new flow2 framework for modelling multi-edge flow networks. The
re-implementation is written in Java and focusses on performance and
clean API surface.
Diffstat (limited to 'opendc-experiments/opendc-experiments-tf20/src')
5 files changed, 50 insertions, 52 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index c71c4520..a7fc102a 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -31,16 +31,17 @@ import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext +import org.opendc.simulator.compute.SimPsuFactories import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.PowerModel -import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.power.CpuPowerModel import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.flow2.FlowStage +import org.opendc.simulator.flow2.FlowStageLogic +import org.opendc.simulator.flow2.OutPort import java.time.Clock import java.util.ArrayDeque import java.util.UUID @@ -60,7 +61,7 @@ public class SimTFDevice( clock: Clock, pu: ProcessingUnit, private val memory: MemoryUnit, - powerModel: PowerModel + powerModel: CpuPowerModel ) : TFDevice { /** * The scope in which the device runs. @@ -70,25 +71,25 @@ public class SimTFDevice( /** * The [SimMachine] representing the device. */ - private val machine = SimBareMetalMachine( - FlowEngine(scope.coroutineContext, clock), + private val machine = SimBareMetalMachine.create( + FlowEngine.create(context, clock).newGraph(), MachineModel(listOf(pu), listOf(memory)), - SimplePowerDriver(powerModel) + SimPsuFactories.simple(powerModel) ) /** * The workload that will be run by the device. */ - private val workload = object : SimWorkload, FlowSource { + private val workload = object : SimWorkload, FlowStageLogic { /** - * The resource context to interrupt the workload with. + * The [FlowStage] of the workload. */ - var ctx: FlowConnection? = null + var stage: FlowStage? = null /** - * The capacity of the device. + * The output of the workload. */ - private var capacity: Double = 0.0 + private var output: OutPort? = null /** * The queue of work to run. @@ -112,37 +113,35 @@ public class SimTFDevice( private var lastPull: Long = 0L override fun onStart(ctx: SimMachineContext) { - for (cpu in ctx.cpus) { - cpu.startConsumer(this) - } - } + val stage = ctx.graph.newStage(this) + this.stage = stage + output = stage.getOutlet("out") + lastPull = ctx.graph.engine.clock.millis() - override fun onStop(ctx: SimMachineContext) {} + ctx.graph.connect(output, ctx.cpus[0].input) + } - override fun onStart(conn: FlowConnection, now: Long) { - ctx = conn - capacity = conn.capacity - lastPull = now - conn.shouldSourceConverge = false + override fun onStop(ctx: SimMachineContext) { + stage?.close() + stage = null + output = null } - override fun onPull(conn: FlowConnection, now: Long): Long { + override fun onUpdate(ctx: FlowStage, now: Long): Long { + val output = output ?: return Long.MAX_VALUE val lastPull = lastPull this.lastPull = now val delta = (now - lastPull).coerceAtLeast(0) - - val consumedWork = conn.rate * delta / 1000.0 - - capacity = conn.capacity + val consumedWork = output.rate * delta / 1000.0 val activeWork = activeWork if (activeWork != null) { if (activeWork.consume(consumedWork)) { this.activeWork = null } else { - val duration = ceil(activeWork.flops / conn.capacity * 1000).toLong() - conn.push(conn.capacity) - return duration + val duration = ceil(activeWork.flops / output.capacity * 1000).toLong() + output.push(output.capacity) + return now + duration } } @@ -150,11 +149,11 @@ public class SimTFDevice( val head = queue.poll() return if (head != null) { this.activeWork = head - val duration = (head.flops / conn.capacity * 1000).roundToLong() - conn.push(conn.capacity) - duration + val duration = (head.flops / output.capacity * 1000).roundToLong() + output.push(output.capacity) + now + duration } else { - conn.push(0.0) + output.push(0.0f) Long.MAX_VALUE } } @@ -174,13 +173,12 @@ public class SimTFDevice( override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont -> workload.queue.add(Work(flops, cont)) if (workload.isIdle) { - workload.ctx?.pull() + workload.stage?.invalidate() } } override fun getDeviceStats(): TFDeviceStats { - val resourceUsage = machine.cpus.sumOf { it.rate } - return TFDeviceStats(resourceUsage, machine.powerUsage, machine.energyUsage) + return TFDeviceStats(machine.cpuUsage, machine.psu.powerUsage, machine.psu.energyUsage) } override fun close() { diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt index 4913c019..2a7578b3 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt @@ -29,7 +29,7 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel +import org.opendc.simulator.compute.power.CpuPowerModels import java.io.InputStream import java.util.UUID @@ -102,7 +102,7 @@ public class MLEnvironmentReader { "node-${counter++}", mapOf("gpu" to isGpuFlag), MachineModel(cores, memories), - LinearPowerModel(maxPower, minPower) + CpuPowerModels.linear(maxPower, minPower) ) } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt index 63f00d53..6b72e155 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt @@ -23,7 +23,7 @@ package org.opendc.experiments.tf20.util import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.compute.power.CpuPowerModel import java.util.UUID /** @@ -34,5 +34,5 @@ public data class MachineDef( val name: String, val meta: Map<String, Any>, val model: MachineModel, - val powerModel: PowerModel + val powerModel: CpuPowerModel ) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt index c4698058..32f72686 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt @@ -29,7 +29,7 @@ import org.opendc.experiments.tf20.core.SimTFDevice import org.opendc.experiments.tf20.distribute.MirroredStrategy import org.opendc.experiments.tf20.distribute.OneDeviceStrategy import org.opendc.experiments.tf20.util.MLEnvironmentReader -import org.opendc.simulator.compute.power.LinearPowerModel +import org.opendc.simulator.compute.power.CpuPowerModels import org.opendc.simulator.kotlin.runSimulation import java.util.UUID @@ -52,7 +52,7 @@ class TensorFlowTest { clock, def.model.cpus[0], def.model.memory[0], - LinearPowerModel(250.0, 60.0) + CpuPowerModels.linear(250.0, 60.0) ) val strategy = OneDeviceStrategy(device) val batchSize = 32 @@ -87,7 +87,7 @@ class TensorFlowTest { clock, def.model.cpus[0], def.model.memory[0], - LinearPowerModel(250.0, 60.0) + CpuPowerModels.linear(250.0, 60.0) ) val strategy = OneDeviceStrategy(device) val batchSize = 128 @@ -102,8 +102,8 @@ class TensorFlowTest { val stats = device.getDeviceStats() assertAll( - { assertEquals(176230322904, clock.millis()) }, - { assertEquals(4.4057580726E10, stats.energyUsage) } + { assertEquals(176230328513, clock.millis()) }, + { assertEquals(4.405758212825E10, stats.energyUsage) } ) } @@ -122,7 +122,7 @@ class TensorFlowTest { clock, def.model.cpus[0], def.model.memory[0], - LinearPowerModel(250.0, 60.0) + CpuPowerModels.linear(250.0, 60.0) ) val deviceB = SimTFDevice( @@ -132,7 +132,7 @@ class TensorFlowTest { clock, def.model.cpus[0], def.model.memory[0], - LinearPowerModel(250.0, 60.0) + CpuPowerModels.linear(250.0, 60.0) ) val strategy = MirroredStrategy(listOf(deviceA, deviceB)) diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 85d63e9b..910cbcc9 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -30,7 +30,7 @@ import org.junit.jupiter.api.Test import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel +import org.opendc.simulator.compute.power.CpuPowerModels import org.opendc.simulator.kotlin.runSimulation import java.util.UUID @@ -51,7 +51,7 @@ internal class SimTFDeviceTest { clock, pu, memory, - LinearPowerModel(250.0, 100.0) + CpuPowerModels.linear(250.0, 100.0) ) // Load 1 GiB into GPU memory |
