summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-02 23:06:36 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-02 23:06:36 +0200
commit1ee02377aca356a99506ac247b376d7cf070048d (patch)
tree2d68f36d8836f127e2bb37d28540053e51809b08 /opendc-simulator/opendc-simulator-compute/src
parentcc87c9ad0b8e4ed3fa4fbad4ab94c5e53948ef3c (diff)
simulator: Start consumers directly from workload
This change updates the SimWorkload interfaces to allow implementations to start consumers for the machine resource providers directly.
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt115
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt104
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt51
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt76
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt55
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt9
9 files changed, 228 insertions, 221 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index 2b84a17c..68ecc49f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -22,17 +22,10 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.launch
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
import org.opendc.simulator.resources.SimResourceSwitch
-import java.time.Clock
/**
* Abstract implementation of the [SimHypervisor] interface.
@@ -86,117 +79,37 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource
* @property performanceInterferenceModel The performance interference model to utilize.
*/
private inner class VirtualMachine(
- override val model: SimMachineModel,
+ model: SimMachineModel,
val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- ) : SimMachine {
- /**
- * A [StateFlow] representing the CPU usage of the simulated machine.
- */
- override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- /**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
+ ) : SimAbstractMachine(interpreter, parent = null, model) {
/**
* The vCPUs of the machine.
*/
- private val cpus = model.cpus.map { ProcessingUnitImpl(it, switch) }
-
- /**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- coroutineScope {
- require(!isTerminated) { "Machine is terminated" }
+ override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) }
- val ctx = object : SimMachineContext {
- override val cpus: List<SimProcessingUnit> = this@VirtualMachine.cpus
-
- override val memory: List<MemoryUnit>
- get() = model.memory
-
- override val clock: Clock
- get() = this@SimAbstractHypervisor.context.clock
-
- override val meta: Map<String, Any> = meta
- }
-
- interpreter.batch {
- workload.onStart(ctx)
-
- for (cpu in cpus) {
- launch {
- cpu.consume(workload.getConsumer(ctx, cpu.model))
- }
- }
- }
- }
- }
-
- /**
- * Terminate this VM instance.
- */
override fun close() {
- if (!isTerminated) {
- isTerminated = true
+ super.close()
- cpus.forEach(SimProcessingUnit::close)
- _vms.remove(this)
- }
+ _vms.remove(this)
}
}
override fun onStart(ctx: SimMachineContext) {
context = ctx
switch = createSwitch(ctx)
- }
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- val forwarder = SimResourceForwarder()
- switch.addInput(forwarder)
- return forwarder
+ for (cpu in ctx.cpus) {
+ switch.addInput(cpu)
+ }
}
/**
- * The [SimProcessingUnit] of this machine.
+ * A [SimProcessingUnit] of a virtual machine.
*/
- public inner class ProcessingUnitImpl(override val model: ProcessingUnit, switch: SimResourceSwitch) : SimProcessingUnit {
- /**
- * The actual resource supporting the processing unit.
- */
- private val source = switch.newOutput()
-
- override val state: SimResourceState
- get() = source.state
-
- override val capacity: Double
- get() = source.capacity
-
- override val speed: Double
- get() = source.speed
-
- override val demand: Double
- get() = source.demand
-
- override val counters: SimResourceCounters
- get() = source.counters
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- source.startConsumer(consumer)
- }
-
- override fun interrupt() {
- source.interrupt()
- }
-
- override fun cancel() {
- source.cancel()
- }
-
- override fun close() {
- source.close()
- }
+ private class VCpu(
+ private val source: SimResourceProvider,
+ override val model: ProcessingUnit
+ ) : SimProcessingUnit, SimResourceProvider by source {
+ override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index d40cdac5..e12ac72b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -27,24 +27,27 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSystem
-import org.opendc.simulator.resources.batch
-import org.opendc.simulator.resources.consume
-import java.time.Clock
+import org.opendc.simulator.resources.*
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
*
* @param interpreter The interpreter to manage the machine's resources.
* @param parent The parent simulation system.
+ * @param model The model of the machine.
*/
public abstract class SimAbstractMachine(
protected val interpreter: SimResourceInterpreter,
- final override val parent: SimResourceSystem?
+ final override val parent: SimResourceSystem?,
+ final override val model: SimMachineModel
) : SimMachine, SimResourceSystem {
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
private val _usage = MutableStateFlow(0.0)
- override val usage: StateFlow<Double>
+ public final override val usage: StateFlow<Double>
get() = _usage
/**
@@ -55,49 +58,66 @@ public abstract class SimAbstractMachine(
private var _speed = doubleArrayOf()
/**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
- /**
* The resources allocated for this machine.
*/
protected abstract val cpus: List<SimProcessingUnit>
/**
- * The execution context in which the workload runs.
+ * A flag to indicate that the machine is terminated.
*/
- private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val clock: Clock
- get() = interpreter.clock
-
- override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
+ private var isTerminated = false
- override val memory: List<MemoryUnit> = model.memory
- }
+ /**
+ * The continuation to resume when the virtual machine workload has finished.
+ */
+ private var cont: Continuation<Unit>? = null
/**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = coroutineScope {
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
check(!isTerminated) { "Machine is terminated" }
+ check(cont == null) { "A machine cannot run concurrently" }
+
val ctx = Context(meta)
// Before the workload starts, initialize the initial power draw
_speed = DoubleArray(model.cpus.size) { 0.0 }
updateUsage(0.0)
- interpreter.batch {
- workload.onStart(ctx)
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+
+ // Cancel all cpus on cancellation
+ cont.invokeOnCancellation {
+ this.cont = null
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
+ }
+ }
+
+ interpreter.batch { workload.onStart(ctx) }
+ }
+ }
+
+ override fun close() {
+ if (isTerminated) {
+ return
+ }
+
+ isTerminated = true
+ cancel()
+ interpreter.batch {
for (cpu in cpus) {
- val model = cpu.model
- val consumer = workload.getConsumer(ctx, model)
- launch { cpu.consume(consumer) }
+ cpu.close()
}
}
}
+ /* SimResourceSystem */
override fun onConverge(timestamp: Long) {
val totalCapacity = model.cpus.sumOf { it.frequency }
val cpus = cpus
@@ -117,12 +137,34 @@ public abstract class SimAbstractMachine(
_usage.value = usage
}
- override fun close() {
- if (isTerminated) {
- return
+ /**
+ * Cancel the workload that is currently running on the machine.
+ */
+ private fun cancel() {
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
}
- isTerminated = true
- cpus.forEach(SimProcessingUnit::close)
+ val cont = cont
+ if (cont != null) {
+ this.cont = null
+ cont.resume(Unit)
+ }
+ }
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ override val interpreter: SimResourceInterpreter
+ get() = this@SimAbstractMachine.interpreter
+
+ override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
+
+ override val memory: List<MemoryUnit> = model.memory
+
+ override fun close() = cancel()
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 082719e2..7a49f29a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -44,12 +44,14 @@ import org.opendc.simulator.resources.SimResourceInterpreter
@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
public class SimBareMetalMachine(
interpreter: SimResourceInterpreter,
- override val model: SimMachineModel,
+ model: SimMachineModel,
scalingGovernor: ScalingGovernor,
scalingDriver: ScalingDriver,
parent: SimResourceSystem? = null,
-) : SimAbstractMachine(interpreter, parent) {
- override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
+) : SimAbstractMachine(interpreter, parent, model) {
+ override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu ->
+ Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu)
+ }
/**
* Construct the [ScalingDriver.Logic] for this machine.
@@ -81,43 +83,12 @@ public class SimBareMetalMachine(
}
/**
- * The [SimProcessingUnit] of this machine.
+ * A [SimProcessingUnit] of a bare-metal machine.
*/
- public inner class ProcessingUnitImpl(override val model: ProcessingUnit) : SimProcessingUnit {
- /**
- * The actual resource supporting the processing unit.
- */
- private val source = SimResourceSource(model.frequency, interpreter, this@SimBareMetalMachine)
-
- override val state: SimResourceState
- get() = source.state
-
- override val capacity: Double
- get() = source.capacity
-
- override val speed: Double
- get() = source.speed
-
- override val demand: Double
- get() = source.demand
-
- override val counters: SimResourceCounters
- get() = source.counters
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- source.startConsumer(consumer)
- }
-
- override fun interrupt() {
- source.interrupt()
- }
-
- override fun cancel() {
- source.cancel()
- }
-
- override fun close() {
- source.close()
- }
+ private class Cpu(
+ private val source: SimResourceProvider,
+ override val model: ProcessingUnit
+ ) : SimProcessingUnit, SimResourceProvider by source {
+ override fun toString(): String = "SimBareMetalMachine.Cpu[model=$model]"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index c2523a2a..5cbabc86 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -23,18 +23,18 @@
package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.MemoryUnit
-import java.time.Clock
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* A simulated execution context in which a bootable image runs. This interface represents the
* firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on
* which the image runs.
*/
-public interface SimMachineContext {
+public interface SimMachineContext : AutoCloseable {
/**
- * The virtual clock tracking simulation time.
+ * The resource interpreter that simulates the machine.
*/
- public val clock: Clock
+ public val interpreter: SimResourceInterpreter
/**
* The metadata associated with the context.
@@ -50,4 +50,9 @@ public interface SimMachineContext {
* The memory available on the machine
*/
public val memory: List<MemoryUnit>
+
+ /**
+ * Stop the workload.
+ */
+ public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt
new file mode 100644
index 00000000..43662d93
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.util
+
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.SimResourceEvent
+
+/**
+ * A helper class to manage the lifecycle of a [SimWorkload]
+ */
+public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
+ /**
+ * The resource consumers which represent the lifecycle of the workload.
+ */
+ private val waiting = mutableSetOf<SimResourceConsumer>()
+
+ /**
+ * Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
+ */
+ public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer {
+ waiting.add(consumer)
+ return object : SimResourceConsumer by consumer {
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ try {
+ consumer.onEvent(ctx, event)
+ } finally {
+ if (event == SimResourceEvent.Exit) {
+ complete(consumer)
+ }
+ }
+ }
+
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ try {
+ consumer.onFailure(ctx, cause)
+ } finally {
+ complete(consumer)
+ }
+ }
+
+ override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]"
+ }
+ }
+
+ /**
+ * Complete the specified [SimResourceConsumer].
+ */
+ private fun complete(consumer: SimResourceConsumer) {
+ if (waiting.remove(consumer) && waiting.isEmpty()) {
+ ctx.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 63c9d28c..de6832ca 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -23,8 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
@@ -43,10 +42,11 @@ public class SimFlopsWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimMachineContext) {}
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)))
+ }
}
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index a3420e32..318a6b49 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -23,8 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
@@ -42,11 +41,12 @@ public class SimRuntimeWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimMachineContext) {}
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- val limit = cpu.frequency * utilization
- return SimWorkConsumer((limit / 1000) * duration, utilization)
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ for (cpu in ctx.cpus) {
+ val limit = cpu.capacity * utilization
+ cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization)))
+ }
}
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index ffb332d1..6929f4d2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -24,6 +24,7 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -44,33 +45,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
barrier = SimConsumerBarrier(ctx.cpus.size)
fragment = nextFragment()
- offset = ctx.clock.millis()
- }
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- return object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val usage = fragment.usage / fragment.cores
- val work = (fragment.duration / 1000) * usage
- val deadline = offset + fragment.duration
-
- assert(deadline >= now) { "Deadline already passed" }
-
- val cmd =
- if (cpu.id < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ offset = ctx.interpreter.clock.millis()
- if (barrier.enter()) {
- this@SimTraceWorkload.fragment = nextFragment()
- this@SimTraceWorkload.offset += fragment.duration
- }
+ val lifecycle = SimWorkloadLifecycle(ctx)
- return cmd
- }
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model)))
}
}
@@ -87,6 +67,31 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
}
}
+ private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val now = ctx.clock.millis()
+ val fragment = fragment ?: return SimResourceCommand.Exit
+ val usage = fragment.usage / fragment.cores
+ val work = (fragment.duration / 1000) * usage
+ val deadline = offset + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ val cmd =
+ if (cpu.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+
+ if (barrier.enter()) {
+ this@SimTraceWorkload.fragment = nextFragment()
+ this@SimTraceWorkload.offset += fragment.duration
+ }
+
+ return cmd
+ }
+ }
+
/**
* A fragment of the workload.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index bdc12bb5..b80665fa 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -23,8 +23,6 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
/**
* A model that characterizes the runtime behavior of some particular workload.
@@ -35,11 +33,8 @@ import org.opendc.simulator.resources.SimResourceConsumer
public interface SimWorkload {
/**
* This method is invoked when the workload is started.
+ *
+ * @param ctx The execution context in which the machine runs.
*/
public fun onStart(ctx: SimMachineContext)
-
- /**
- * Obtain the resource consumer for the specified processing unit.
- */
- public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer
}