diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
| commit | aa9b32f8cd1467e9718959f400f6777e5d71737d (patch) | |
| tree | b88bbede15108c6855d7f94ded4c7054df186a72 /opendc-experiments/opendc-experiments-tf20/src/main | |
| parent | eb0e0a3bc557c05a70eead388797ab850ea87366 (diff) | |
| parent | b7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (diff) | |
merge: Integrate v2.1 progress into public repository
This pull request integrates the changes planned for the v2.1 release of
OpenDC into the public Github repository in order to sync the progress
of both repositories.
Diffstat (limited to 'opendc-experiments/opendc-experiments-tf20/src/main')
7 files changed, 102 insertions, 56 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt index 9a48aced..2153a862 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt @@ -55,7 +55,8 @@ public class TensorFlowExperiment : Experiment(name = "tf20") { .build() val meter = meterProvider.get("opendc-tf20") - val def = MLEnvironmentReader(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)).read().first() + val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)) + val def = MLEnvironmentReader().readEnvironment(envInput).first() val device = SimTFDevice( def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0], def.model.memory[0], LinearPowerModel(250.0, 60.0) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index f4c18ff1..fb36d2c7 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,28 +22,26 @@ package org.opendc.experiments.tf20.core +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.SimMachineModel -import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceCommand -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.* import java.time.Clock import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume +import kotlin.math.roundToLong /** * A [TFDevice] implementation using simulated components. @@ -67,36 +65,41 @@ public class SimTFDevice( * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine( - scope.coroutineContext, clock, SimMachineModel(listOf(pu), listOf(memory)), - PerformanceScalingGovernor(), SimpleScalingDriver(powerModel) + FlowEngine(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)), + SimplePowerDriver(powerModel) ) /** + * The identifier of a device. + */ + private val deviceId = AttributeKey.stringKey("device.id") + + /** * The usage of the device. */ - private val _usage = meter.doubleValueRecorderBuilder("device.usage") + private val _usage = meter.histogramBuilder("device.usage") .setDescription("The amount of device resources used") .setUnit("MHz") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The power draw of the device. */ - private val _power = meter.doubleValueRecorderBuilder("device.power") + private val _power = meter.histogramBuilder("device.power") .setDescription("The power draw of the device") .setUnit("W") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The workload that will be run by the device. */ - private val workload = object : SimWorkload, SimResourceConsumer { + private val workload = object : SimWorkload, FlowSource { /** * The resource context to interrupt the workload with. */ - var ctx: SimResourceContext? = null + var ctx: FlowConnection? = null /** * The capacity of the device. @@ -119,17 +122,32 @@ public class SimTFDevice( */ private var activeWork: Work? = null - override fun onStart(ctx: SimMachineContext) {} + override fun onStart(ctx: SimMachineContext) { + for (cpu in ctx.cpus) { + cpu.startConsumer(this) + } + } + + override fun onStart(conn: FlowConnection, now: Long) { + ctx = conn + capacity = conn.capacity - override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer = this + conn.shouldSourceConverge = true + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumedWork = conn.rate * delta / 1000.0 + + capacity = conn.capacity - override fun onNext(ctx: SimResourceContext): SimResourceCommand { val activeWork = activeWork if (activeWork != null) { - if (activeWork.consume(activeWork.flops - ctx.remainingWork)) { + if (activeWork.consume(consumedWork)) { this.activeWork = null } else { - return SimResourceCommand.Consume(activeWork.flops, ctx.capacity) + val duration = (activeWork.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) + return duration } } @@ -137,28 +155,18 @@ public class SimTFDevice( val head = queue.poll() return if (head != null) { this.activeWork = head - SimResourceCommand.Consume(head.flops, ctx.capacity) + val duration = (head.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) + duration } else { - SimResourceCommand.Idle() + conn.push(0.0) + Long.MAX_VALUE } } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - this.capacity = ctx.capacity - } - SimResourceEvent.Capacity -> { - this.capacity = ctx.capacity - ctx.interrupt() - } - SimResourceEvent.Run -> { - _usage.record(ctx.speed) - _power.record(machine.powerDraw) - } - else -> {} - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + _usage.record(conn.rate) + _power.record(machine.psu.powerDraw) } } @@ -176,7 +184,7 @@ public class SimTFDevice( override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont -> workload.queue.add(Work(flops, cont)) if (workload.isIdle) { - workload.ctx?.interrupt() + workload.ctx?.pull() } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt index 5839c0df..3e755b56 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt @@ -27,7 +27,7 @@ package org.opendc.experiments.tf20.distribute */ public interface Strategy { /** - * Run the specified batch using the given strategy. + * Converge the specified batch using the given strategy. */ public suspend fun run(forward: Double, backward: Double, batchSize: Int) } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt index 411ddb59..83995fa1 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/keras/Sequential.kt @@ -49,10 +49,10 @@ public class Sequential(vararg layers: Layer) : TrainableModel(*layers) { } override fun forward(): Double { - return layers.sumByDouble { it.forward() } + return layers.sumOf { it.forward() } } override fun backward(): Double { - return layers.sumByDouble { it.backward() } + return layers.sumOf { it.backward() } } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt index 75b11423..9771cc20 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/network/NetworkController.kt @@ -82,7 +82,7 @@ public class NetworkController(context: CoroutineContext, clock: Clock) : AutoCl val target = channels[to] ?: return // Drop if destination not found - scheduler.startSingleTimer(message, delayTime) { target.offer(message) } + scheduler.startSingleTimer(message, delayTime) { target.trySend(message) } } /** diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt index eea079fb..3cdf28fd 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MLEnvironmentReader.kt @@ -25,9 +25,7 @@ package org.opendc.experiments.tf20.util import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit @@ -36,13 +34,16 @@ import java.io.InputStream import java.util.* /** - * An [EnvironmentReader] for the TensorFlow experiments. + * An environment reader for the TensorFlow experiments. */ -public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { +public class MLEnvironmentReader { + /** + * The [ObjectMapper] to convert the format. + */ + private val mapper = jacksonObjectMapper() - private val setup: Setup = mapper.readValue(input) - - override fun read(): List<MachineDef> { + public fun readEnvironment(input: InputStream): List<MachineDef> { + val setup: Setup = mapper.readValue(input) var counter = 0 return setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> @@ -100,7 +101,7 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack UUID(0, counter.toLong()), "node-${counter++}", mapOf("gpu" to isGpuFlag), - SimMachineModel(cores, memories), + MachineModel(cores, memories), LinearPowerModel(maxPower, minPower) ) } @@ -109,6 +110,4 @@ public class MLEnvironmentReader(input: InputStream, mapper: ObjectMapper = jack } } } - - override fun close() {} } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt new file mode 100644 index 00000000..271f5923 --- /dev/null +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/util/MachineDef.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.tf20.util + +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.power.PowerModel +import java.util.* + +/** + * A definition of a machine in a cluster. + */ +public data class MachineDef( + val uid: UUID, + val name: String, + val meta: Map<String, Any>, + val model: MachineModel, + val powerModel: PowerModel +) |
