summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-tf20/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-tf20/src')
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt3
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt88
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt2
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt4
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt2
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt21
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt38
7 files changed, 102 insertions, 56 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
index 9a48aced..2153a862 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt
@@ -55,7 +55,8 @@ public class TensorFlowExperiment : Experiment(name = "tf20") {
.build()
val meter = meterProvider.get("opendc-tf20")
- val def = MLEnvironmentReader(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)).read().first()
+ val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile))
+ val def = MLEnvironmentReader().readEnvironment(envInput).first()
val device = SimTFDevice(
def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0],
def.model.memory[0], LinearPowerModel(250.0, 60.0)
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index f4c18ff1..fb36d2c7 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -22,28 +22,26 @@
package org.opendc.experiments.tf20.core
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.common.Labels
import kotlinx.coroutines.*
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.SimMachineModel
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
+import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceCommand
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.flow.*
import java.time.Clock
import java.util.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
+import kotlin.math.roundToLong
/**
* A [TFDevice] implementation using simulated components.
@@ -67,36 +65,41 @@ public class SimTFDevice(
* The [SimMachine] representing the device.
*/
private val machine = SimBareMetalMachine(
- scope.coroutineContext, clock, SimMachineModel(listOf(pu), listOf(memory)),
- PerformanceScalingGovernor(), SimpleScalingDriver(powerModel)
+ FlowEngine(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)),
+ SimplePowerDriver(powerModel)
)
/**
+ * The identifier of a device.
+ */
+ private val deviceId = AttributeKey.stringKey("device.id")
+
+ /**
* The usage of the device.
*/
- private val _usage = meter.doubleValueRecorderBuilder("device.usage")
+ private val _usage = meter.histogramBuilder("device.usage")
.setDescription("The amount of device resources used")
.setUnit("MHz")
.build()
- .bind(Labels.of("device", uid.toString()))
+ .bind(Attributes.of(deviceId, uid.toString()))
/**
* The power draw of the device.
*/
- private val _power = meter.doubleValueRecorderBuilder("device.power")
+ private val _power = meter.histogramBuilder("device.power")
.setDescription("The power draw of the device")
.setUnit("W")
.build()
- .bind(Labels.of("device", uid.toString()))
+ .bind(Attributes.of(deviceId, uid.toString()))
/**
* The workload that will be run by the device.
*/
- private val workload = object : SimWorkload, SimResourceConsumer {
+ private val workload = object : SimWorkload, FlowSource {
/**
* The resource context to interrupt the workload with.
*/
- var ctx: SimResourceContext? = null
+ var ctx: FlowConnection? = null
/**
* The capacity of the device.
@@ -119,17 +122,32 @@ public class SimTFDevice(
*/
private var activeWork: Work? = null
- override fun onStart(ctx: SimMachineContext) {}
+ override fun onStart(ctx: SimMachineContext) {
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(this)
+ }
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ ctx = conn
+ capacity = conn.capacity
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer = this
+ conn.shouldSourceConverge = true
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val consumedWork = conn.rate * delta / 1000.0
+
+ capacity = conn.capacity
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val activeWork = activeWork
if (activeWork != null) {
- if (activeWork.consume(activeWork.flops - ctx.remainingWork)) {
+ if (activeWork.consume(consumedWork)) {
this.activeWork = null
} else {
- return SimResourceCommand.Consume(activeWork.flops, ctx.capacity)
+ val duration = (activeWork.flops / conn.capacity * 1000).roundToLong()
+ conn.push(conn.capacity)
+ return duration
}
}
@@ -137,28 +155,18 @@ public class SimTFDevice(
val head = queue.poll()
return if (head != null) {
this.activeWork = head
- SimResourceCommand.Consume(head.flops, ctx.capacity)
+ val duration = (head.flops / conn.capacity * 1000).roundToLong()
+ conn.push(conn.capacity)
+ duration
} else {
- SimResourceCommand.Idle()
+ conn.push(0.0)
+ Long.MAX_VALUE
}
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- this.capacity = ctx.capacity
- }
- SimResourceEvent.Capacity -> {
- this.capacity = ctx.capacity
- ctx.interrupt()
- }
- SimResourceEvent.Run -> {
- _usage.record(ctx.speed)
- _power.record(machine.powerDraw)
- }
- else -> {}
- }
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ _usage.record(conn.rate)
+ _power.record(machine.psu.powerDraw)
}
}
@@ -176,7 +184,7 @@ public class SimTFDevice(
override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont ->
workload.queue.add(Work(flops, cont))
if (workload.isIdle) {
- workload.ctx?.interrupt()
+ workload.ctx?.pull()
}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
index 5839c0df..3e755b56 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
@@ -27,7 +27,7 @@ package org.opendc.experiments.tf20.distribute
*/
public interface Strategy {
/**
- * Run the specified batch using the given strategy.
+ * Converge the specified batch using the given strategy.
*/
public suspend fun run(forward: Double, backward: Double, batchSize: Int)
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt
index 411ddb59..83995fa1 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt
@@ -49,10 +49,10 @@ public class Sequential(vararg layers: Layer) : TrainableModel(*layers) {
}
override fun forward(): Double {
- return layers.sumByDouble { it.forward() }
+ return layers.sumOf { it.forward() }
}
override fun backward(): Double {
- return layers.sumByDouble { it.backward() }
+ return layers.sumOf { it.backward() }
}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
index 75b11423..9771cc20 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt
@@ -82,7 +82,7 @@ public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCl
val target = channels[to] ?: return // Drop if destination not found
- scheduler.startSingleTimer(message, delayTime) { target.offer(message) }
+ scheduler.startSingleTimer(message, delayTime) { target.trySend(message) }
}
/**
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
index eea079fb..3cdf28fd 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt
@@ -25,9 +25,7 @@ package org.opendc.experiments.tf20.util
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
-import org.opendc.format.environment.EnvironmentReader
-import org.opendc.format.environment.MachineDef
-import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
@@ -36,13 +34,16 @@ import java.io.InputStream
import java.util.*
/**
- * An [EnvironmentReader] for the TensorFlow experiments.
+ * An environment reader for the TensorFlow experiments.
*/
-public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader {
+public class MLEnvironmentReader {
+ /**
+ * The [ObjectMapper] to convert the format.
+ */
+ private val mapper = jacksonObjectMapper()
- private val setup: Setup = mapper.readValue(input)
-
- override fun read(): List<MachineDef> {
+ public fun readEnvironment(input: InputStream): List<MachineDef> {
+ val setup: Setup = mapper.readValue(input)
var counter = 0
return setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
@@ -100,7 +101,7 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack
UUID(0, counter.toLong()),
"node-${counter++}",
mapOf("gpu" to isGpuFlag),
- SimMachineModel(cores, memories),
+ MachineModel(cores, memories),
LinearPowerModel(maxPower, minPower)
)
}
@@ -109,6 +110,4 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack
}
}
}
-
- override fun close() {}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
new file mode 100644
index 00000000..271f5923
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.tf20.util
+
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.power.PowerModel
+import java.util.*
+
+/**
+ * A definition of a machine in a cluster.
+ */
+public data class MachineDef(
+ val uid: UUID,
+ val name: String,
+ val meta: Map<String, Any>,
+ val model: MachineModel,
+ val powerModel: PowerModel
+)