summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-tf20
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-01 14:38:34 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-21 22:13:04 +0200
commit44215bd668c5fa3efe2f57fc577824478b00af57 (patch)
treeb933228e5e5748716351dc9ce031b4840f254428 /opendc-experiments/opendc-experiments-tf20
parentc1f67a872e2d7ce63ac96f8ca80cbe8b25c62e3b (diff)
refactor(sim/compute): Re-implement using flow2
This change re-implements the OpenDC compute simulator framework using the new flow2 framework for modelling multi-edge flow networks. The re-implementation is written in Java and focusses on performance and clean API surface.
Diffstat (limited to 'opendc-experiments/opendc-experiments-tf20')
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt76
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt4
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt4
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt14
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt4
5 files changed, 50 insertions, 52 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index c71c4520..a7fc102a 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -31,16 +31,17 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.SimPsuFactories
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.PowerModel
-import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.power.CpuPowerModel
import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.FlowStage
+import org.opendc.simulator.flow2.FlowStageLogic
+import org.opendc.simulator.flow2.OutPort
import java.time.Clock
import java.util.ArrayDeque
import java.util.UUID
@@ -60,7 +61,7 @@ public class SimTFDevice(
clock: Clock,
pu: ProcessingUnit,
private val memory: MemoryUnit,
- powerModel: PowerModel
+ powerModel: CpuPowerModel
) : TFDevice {
/**
* The scope in which the device runs.
@@ -70,25 +71,25 @@ public class SimTFDevice(
/**
* The [SimMachine] representing the device.
*/
- private val machine = SimBareMetalMachine(
- FlowEngine(scope.coroutineContext, clock),
+ private val machine = SimBareMetalMachine.create(
+ FlowEngine.create(context, clock).newGraph(),
MachineModel(listOf(pu), listOf(memory)),
- SimplePowerDriver(powerModel)
+ SimPsuFactories.simple(powerModel)
)
/**
* The workload that will be run by the device.
*/
- private val workload = object : SimWorkload, FlowSource {
+ private val workload = object : SimWorkload, FlowStageLogic {
/**
- * The resource context to interrupt the workload with.
+ * The [FlowStage] of the workload.
*/
- var ctx: FlowConnection? = null
+ var stage: FlowStage? = null
/**
- * The capacity of the device.
+ * The output of the workload.
*/
- private var capacity: Double = 0.0
+ private var output: OutPort? = null
/**
* The queue of work to run.
@@ -112,37 +113,35 @@ public class SimTFDevice(
private var lastPull: Long = 0L
override fun onStart(ctx: SimMachineContext) {
- for (cpu in ctx.cpus) {
- cpu.startConsumer(this)
- }
- }
+ val stage = ctx.graph.newStage(this)
+ this.stage = stage
+ output = stage.getOutlet("out")
+ lastPull = ctx.graph.engine.clock.millis()
- override fun onStop(ctx: SimMachineContext) {}
+ ctx.graph.connect(output, ctx.cpus[0].input)
+ }
- override fun onStart(conn: FlowConnection, now: Long) {
- ctx = conn
- capacity = conn.capacity
- lastPull = now
- conn.shouldSourceConverge = false
+ override fun onStop(ctx: SimMachineContext) {
+ stage?.close()
+ stage = null
+ output = null
}
- override fun onPull(conn: FlowConnection, now: Long): Long {
+ override fun onUpdate(ctx: FlowStage, now: Long): Long {
+ val output = output ?: return Long.MAX_VALUE
val lastPull = lastPull
this.lastPull = now
val delta = (now - lastPull).coerceAtLeast(0)
-
- val consumedWork = conn.rate * delta / 1000.0
-
- capacity = conn.capacity
+ val consumedWork = output.rate * delta / 1000.0
val activeWork = activeWork
if (activeWork != null) {
if (activeWork.consume(consumedWork)) {
this.activeWork = null
} else {
- val duration = ceil(activeWork.flops / conn.capacity * 1000).toLong()
- conn.push(conn.capacity)
- return duration
+ val duration = ceil(activeWork.flops / output.capacity * 1000).toLong()
+ output.push(output.capacity)
+ return now + duration
}
}
@@ -150,11 +149,11 @@ public class SimTFDevice(
val head = queue.poll()
return if (head != null) {
this.activeWork = head
- val duration = (head.flops / conn.capacity * 1000).roundToLong()
- conn.push(conn.capacity)
- duration
+ val duration = (head.flops / output.capacity * 1000).roundToLong()
+ output.push(output.capacity)
+ now + duration
} else {
- conn.push(0.0)
+ output.push(0.0f)
Long.MAX_VALUE
}
}
@@ -174,13 +173,12 @@ public class SimTFDevice(
override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont ->
workload.queue.add(Work(flops, cont))
if (workload.isIdle) {
- workload.ctx?.pull()
+ workload.stage?.invalidate()
}
}
override fun getDeviceStats(): TFDeviceStats {
- val resourceUsage = machine.cpus.sumOf { it.rate }
- return TFDeviceStats(resourceUsage, machine.powerUsage, machine.energyUsage)
+ return TFDeviceStats(machine.cpuUsage, machine.psu.powerUsage, machine.psu.energyUsage)
}
override fun close() {
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
index 4913c019..2a7578b3 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
@@ -29,7 +29,7 @@ import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.CpuPowerModels
import java.io.InputStream
import java.util.UUID
@@ -102,7 +102,7 @@ public class MLEnvironmentReader {
"node-${counter++}",
mapOf("gpu" to isGpuFlag),
MachineModel(cores, memories),
- LinearPowerModel(maxPower, minPower)
+ CpuPowerModels.linear(maxPower, minPower)
)
}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
index 63f00d53..6b72e155 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
@@ -23,7 +23,7 @@
package org.opendc.experiments.tf20.util
import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.CpuPowerModel
import java.util.UUID
/**
@@ -34,5 +34,5 @@ public data class MachineDef(
val name: String,
val meta: Map<String, Any>,
val model: MachineModel,
- val powerModel: PowerModel
+ val powerModel: CpuPowerModel
)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
index c4698058..32f72686 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/TensorFlowTest.kt
@@ -29,7 +29,7 @@ import org.opendc.experiments.tf20.core.SimTFDevice
import org.opendc.experiments.tf20.distribute.MirroredStrategy
import org.opendc.experiments.tf20.distribute.OneDeviceStrategy
import org.opendc.experiments.tf20.util.MLEnvironmentReader
-import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.CpuPowerModels
import org.opendc.simulator.kotlin.runSimulation
import java.util.UUID
@@ -52,7 +52,7 @@ class TensorFlowTest {
clock,
def.model.cpus[0],
def.model.memory[0],
- LinearPowerModel(250.0, 60.0)
+ CpuPowerModels.linear(250.0, 60.0)
)
val strategy = OneDeviceStrategy(device)
val batchSize = 32
@@ -87,7 +87,7 @@ class TensorFlowTest {
clock,
def.model.cpus[0],
def.model.memory[0],
- LinearPowerModel(250.0, 60.0)
+ CpuPowerModels.linear(250.0, 60.0)
)
val strategy = OneDeviceStrategy(device)
val batchSize = 128
@@ -102,8 +102,8 @@ class TensorFlowTest {
val stats = device.getDeviceStats()
assertAll(
- { assertEquals(176230322904, clock.millis()) },
- { assertEquals(4.4057580726E10, stats.energyUsage) }
+ { assertEquals(176230328513, clock.millis()) },
+ { assertEquals(4.405758212825E10, stats.energyUsage) }
)
}
@@ -122,7 +122,7 @@ class TensorFlowTest {
clock,
def.model.cpus[0],
def.model.memory[0],
- LinearPowerModel(250.0, 60.0)
+ CpuPowerModels.linear(250.0, 60.0)
)
val deviceB = SimTFDevice(
@@ -132,7 +132,7 @@ class TensorFlowTest {
clock,
def.model.cpus[0],
def.model.memory[0],
- LinearPowerModel(250.0, 60.0)
+ CpuPowerModels.linear(250.0, 60.0)
)
val strategy = MirroredStrategy(listOf(deviceA, deviceB))
diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
index 85d63e9b..910cbcc9 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt
@@ -30,7 +30,7 @@ import org.junit.jupiter.api.Test
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.compute.power.CpuPowerModels
import org.opendc.simulator.kotlin.runSimulation
import java.util.UUID
@@ -51,7 +51,7 @@ internal class SimTFDeviceTest {
clock,
pu,
memory,
- LinearPowerModel(250.0, 100.0)
+ CpuPowerModels.linear(250.0, 100.0)
)
// Load 1 GiB into GPU memory