diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-tf20/src')
7 files changed, 102 insertions, 56 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt index 9a48aced..2153a862 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt @@ -55,7 +55,8 @@ public class TensorFlowExperiment : Experiment(name = "tf20") { .build() val meter = meterProvider.get("opendc-tf20") - val def = MLEnvironmentReader(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)).read().first() + val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)) + val def = MLEnvironmentReader().readEnvironment(envInput).first() val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0], def.model.memory[0], LinearPowerModel(250.0, 60.0) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index f4c18ff1..fb36d2c7 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,28 +22,26 @@ package org.opendc.experiments.tf20.core +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceCommand -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.* import java.time.Clock import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume +import kotlin.math.roundToLong /** * A [TFDevice] implementation using simulated components. @@ -67,36 +65,41 @@ public class SimTFDevice( * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine( - scope.coroutineContext, clock, SimMachineModel(listOf(pu), listOf(memory)), - PerformanceScalingGovernor(), SimpleScalingDriver(powerModel) + FlowEngine(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)), + SimplePowerDriver(powerModel) ) /** + * The identifier of a device. + */ + private val deviceId = AttributeKey.stringKey("device.id") + + /** * The usage of the device. */ - private val _usage = meter.doubleValueRecorderBuilder("device.usage") + private val _usage = meter.histogramBuilder("device.usage") .setDescription("The amount of device resources used") .setUnit("MHz") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The power draw of the device. */ - private val _power = meter.doubleValueRecorderBuilder("device.power") + private val _power = meter.histogramBuilder("device.power") .setDescription("The power draw of the device") .setUnit("W") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The workload that will be run by the device. */ - private val workload = object : SimWorkload, SimResourceConsumer { + private val workload = object : SimWorkload, FlowSource { /** * The resource context to interrupt the workload with. */ - var ctx: SimResourceContext? = null + var ctx: FlowConnection? = null /** * The capacity of the device. @@ -119,17 +122,32 @@ public class SimTFDevice( */ private var activeWork: Work? = null - override fun onStart(ctx: SimMachineContext) {} + override fun onStart(ctx: SimMachineContext) { + for (cpu in ctx.cpus) { + cpu.startConsumer(this) + } + } + + override fun onStart(conn: FlowConnection, now: Long) { + ctx = conn + capacity = conn.capacity - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer = this + conn.shouldSourceConverge = true + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumedWork = conn.rate * delta / 1000.0 + + capacity = conn.capacity - override fun onNext(ctx: SimResourceContext): SimResourceCommand { val activeWork = activeWork if (activeWork != null) { - if (activeWork.consume(activeWork.flops - ctx.remainingWork)) { + if (activeWork.consume(consumedWork)) { this.activeWork = null } else { - return SimResourceCommand.Consume(activeWork.flops, ctx.capacity) + val duration = (activeWork.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) + return duration } } @@ -137,28 +155,18 @@ public class SimTFDevice( val head = queue.poll() return if (head != null) { this.activeWork = head - SimResourceCommand.Consume(head.flops, ctx.capacity) + val duration = (head.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) + duration } else { - SimResourceCommand.Idle() + conn.push(0.0) + Long.MAX_VALUE } } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - this.capacity = ctx.capacity - } - SimResourceEvent.Capacity -> { - this.capacity = ctx.capacity - ctx.interrupt() - } - SimResourceEvent.Run -> { - _usage.record(ctx.speed) - _power.record(machine.powerDraw) - } - else -> {} - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + _usage.record(conn.rate) + _power.record(machine.psu.powerDraw) } } @@ -176,7 +184,7 @@ public class SimTFDevice( override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont -> workload.queue.add(Work(flops, cont)) if (workload.isIdle) { - workload.ctx?.interrupt() + workload.ctx?.pull() } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt index 5839c0df..3e755b56 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt @@ -27,7 +27,7 @@ package org.opendc.experiments.tf20.distribute */ public interface Strategy { /** - * Run the specified batch using the given strategy. + * Converge the specified batch using the given strategy. */ public suspend fun run(forward: Double, backward: Double, batchSize: Int) } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt index 411ddb59..83995fa1 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt @@ -49,10 +49,10 @@ public class Sequential(vararg layers: Layer) : TrainableModel(*layers) { } override fun forward(): Double { - return layers.sumByDouble { it.forward() } + return layers.sumOf { it.forward() } } override fun backward(): Double { - return layers.sumByDouble { it.backward() } + return layers.sumOf { it.backward() } } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 75b11423..9771cc20 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -82,7 +82,7 @@ public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCl val target = channels[to] ?: return // Drop if destination not found - scheduler.startSingleTimer(message, delayTime) { target.offer(message) } + scheduler.startSingleTimer(message, delayTime) { target.trySend(message) } } /** diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt index eea079fb..3cdf28fd 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt @@ -25,9 +25,7 @@ package org.opendc.experiments.tf20.util import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit @@ -36,13 +34,16 @@ import java.io.InputStream import java.util.* /** - * An [EnvironmentReader] for the TensorFlow experiments. + * An environment reader for the TensorFlow experiments. */ -public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { +public class MLEnvironmentReader { + /** + * The [ObjectMapper] to convert the format. + */ + private val mapper = jacksonObjectMapper() - private val setup: Setup = mapper.readValue(input) - - override fun read(): List<MachineDef> { + public fun readEnvironment(input: InputStream): List<MachineDef> { + val setup: Setup = mapper.readValue(input) var counter = 0 return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -100,7 +101,7 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack UUID(0, counter.toLong()), "node-${counter++}", mapOf("gpu" to isGpuFlag), - SimMachineModel(cores, memories), + MachineModel(cores, memories), LinearPowerModel(maxPower, minPower) ) } @@ -109,6 +110,4 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack } } } - - override fun close() {} } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt new file mode 100644 index 00000000..271f5923 --- /dev/null +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.tf20.util + +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerModel +import java.util.* + +/** + * A definition of a machine in a cluster. + */ +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: MachineModel, + val powerModel: PowerModel +) |
