summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 20:57:51 +0200
committerGitHub <noreply@github.com>2021-10-25 20:57:51 +0200
commita8e2d460a3b6803845687585ae0b34e67a9445a3 (patch)
tree6249023f8f0d56392400c7ebb72238ee848f740a /opendc-simulator/opendc-simulator-compute/src
parentb4bf7268cbb6d22d3966f469a6b7721b04d91907 (diff)
parent86c65e875b7dde8872dc81a37aa9dca72eee7782 (diff)
merge: Improve the OpenDC compute model (#37)
This pull request contains various improvements to the OpenDC compute simulation model. - Support filtering hosts based on CPU capacity - Do not allocate lambda in fast-path - Redesign VM interference algorithm - Report provisioning time of virtual machines - Prevent allocations during collection cycle - Use correct flow input capacity for counters - Support running workloads without coroutines **Breaking API Changes** - `VirtualMachine` now requires `cpuCapacity` parameter. - `VmInterferenceModel` needs to be constructed using `VmInterferenceModel.Builder` and can't be passed a list of groups anymore. - Scheduling latency is not collected anymore. Instead, use the boot time and provisioning time to derive the scheduling latency. - Telemetry data is recorded using `*TableReader` interfaces as opposed to the `*Data` classes. These classes are re-used per row and should not be shared with other threads, since the underlying data may change. - `SimMachine` does not implement `AutoCloseable` anymore. Machines can be removed from a `SimHypervisor` using the `removeMachine` method. - `SimMachine.run` is moved to an extension method called `runWorkload`. Users can now also choose to run a workload using the asynchronous `SimMachine.startWorkload`.
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt32
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt69
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt147
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt17
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt107
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt22
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt44
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt385
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt53
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt211
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt57
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt63
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt35
18 files changed, 806 insertions, 458 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index cb52d24f..91e91f9d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -76,7 +76,7 @@ class SimMachineBenchmarks {
val machine = SimBareMetalMachine(
engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- return@runBlockingSimulation machine.run(SimTraceWorkload(trace))
+ return@runBlockingSimulation machine.runWorkload(SimTraceWorkload(trace))
}
}
@@ -89,15 +89,15 @@ class SimMachineBenchmarks {
)
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
- val vm = hypervisor.createMachine(machineModel)
+ val vm = hypervisor.newMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(SimTraceWorkload(trace))
+ return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace))
} finally {
- vm.close()
- machine.close()
+ vm.cancel()
+ machine.cancel()
}
}
}
@@ -111,15 +111,15 @@ class SimMachineBenchmarks {
)
val hypervisor = SimFairShareHypervisor(engine, null, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
- val vm = hypervisor.createMachine(machineModel)
+ val vm = hypervisor.newMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(SimTraceWorkload(trace))
+ return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace))
} finally {
- vm.close()
- machine.close()
+ vm.cancel()
+ machine.cancel()
}
}
}
@@ -133,22 +133,22 @@ class SimMachineBenchmarks {
)
val hypervisor = SimFairShareHypervisor(engine, null, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
coroutineScope {
repeat(2) {
- val vm = hypervisor.createMachine(machineModel)
+ val vm = hypervisor.newMachine(machineModel)
launch {
try {
- vm.run(SimTraceWorkload(trace))
+ vm.runWorkload(SimTraceWorkload(trace))
} finally {
- machine.close()
+ machine.cancel()
}
}
}
}
- machine.close()
+ machine.cancel()
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
new file mode 100644
index 00000000..c23f48dc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.simulator.compute.workload.SimWorkload
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+
+/**
+ * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished.
+ *
+ * @param workload The workload to start on the machine.
+ * @param meta The metadata to pass to the workload.
+ * @return A [SimMachineContext] that represents the execution context for the workload.
+ * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed.
+ */
+public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) {
+ return suspendCancellableCoroutine { cont ->
+ cont.invokeOnCancellation { this@runWorkload.cancel() }
+
+ startWorkload(
+ object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ workload.onStart(ctx)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ try {
+ workload.onStop(ctx)
+
+ if (!cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+ },
+ meta
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 60a10f20..6a4c594d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
+import mu.KotlinLogging
import org.opendc.simulator.compute.device.SimNetworkAdapter
import org.opendc.simulator.compute.device.SimPeripheral
import org.opendc.simulator.compute.model.MachineModel
@@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter
import org.opendc.simulator.compute.model.StorageDevice
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.*
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
@@ -72,48 +71,20 @@ public abstract class SimAbstractMachine(
public override val peripherals: List<SimPeripheral> = net.map { it as SimNetworkAdapter }
/**
- * A flag to indicate that the machine is terminated.
+ * The current active [Context].
*/
- private var isTerminated = false
+ private var _ctx: Context? = null
- /**
- * The continuation to resume when the virtual machine workload has finished.
- */
- private var cont: Continuation<Unit>? = null
-
- /**
- * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- check(!isTerminated) { "Machine is terminated" }
- check(cont == null) { "A machine cannot run concurrently" }
-
- val ctx = Context(meta)
-
- return suspendCancellableCoroutine { cont ->
- this.cont = cont
-
- // Cancel all cpus on cancellation
- cont.invokeOnCancellation {
- this.cont = null
- engine.batch {
- for (cpu in cpus) {
- cpu.cancel()
- }
- }
- }
+ override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
+ check(_ctx == null) { "A machine cannot run concurrently" }
- engine.batch { workload.onStart(ctx) }
- }
+ val ctx = Context(workload, meta)
+ ctx.start()
+ return ctx
}
- override fun close() {
- if (isTerminated) {
- return
- }
-
- isTerminated = true
- cancel()
+ override fun cancel() {
+ _ctx?.close()
}
override fun onConverge(now: Long, delta: Long) {
@@ -121,29 +92,35 @@ public abstract class SimAbstractMachine(
}
/**
- * Cancel the workload that is currently running on the machine.
+ * The execution context in which the workload runs.
+ *
+ * @param workload The workload that is running on the machine.
+ * @param meta The metadata passed to the workload.
*/
- private fun cancel() {
- engine.batch {
- for (cpu in cpus) {
- cpu.cancel()
+ private inner class Context(
+ private val workload: SimWorkload,
+ override val meta: Map<String, Any>
+ ) : SimMachineContext {
+ /**
+ * A flag to indicate that the context has been closed.
+ */
+ private var isClosed = false
+
+ override val engine: FlowEngine = this@SimAbstractMachine.engine
+
+ /**
+ * Start this context.
+ */
+ fun start() {
+ try {
+ _ctx = this
+ engine.batch { workload.onStart(this) }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Workload failed during onStart callback" }
+ close()
}
}
- val cont = cont
- if (cont != null) {
- this.cont = null
- cont.resume(Unit)
- }
- }
-
- /**
- * The execution context in which the workload runs.
- */
- private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val engine: FlowEngine
- get() = this@SimAbstractMachine.engine
-
override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
override val memory: SimMemory = this@SimAbstractMachine.memory
@@ -152,7 +129,49 @@ public abstract class SimAbstractMachine(
override val storage: List<SimStorageInterface> = this@SimAbstractMachine.storage
- override fun close() = cancel()
+ override fun close() {
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" }
+ _ctx = null
+
+ // Cancel all the resources associated with the machine
+ doCancel()
+
+ try {
+ workload.onStop(this)
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Workload failed during onStop callback" }
+ }
+ }
+
+ /**
+ * Run the stop procedures for the resources associated with the machine.
+ */
+ private fun doCancel() {
+ engine.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
+
+ memory.cancel()
+
+ for (ifx in net) {
+ (ifx as NetworkAdapterImpl).disconnect()
+ }
+
+ for (storage in storage) {
+ val impl = storage as StorageDeviceImpl
+ impl.read.cancel()
+ impl.write.cancel()
+ }
+ }
+ }
+
+ override fun toString(): String = "SimAbstractMachine.Context"
}
/**
@@ -166,7 +185,7 @@ public abstract class SimAbstractMachine(
* The [SimNetworkAdapter] implementation for a machine.
*/
private class NetworkAdapterImpl(
- private val engine: FlowEngine,
+ engine: FlowEngine,
model: NetworkAdapter,
index: Int
) : SimNetworkAdapter(), SimNetworkInterface {
@@ -208,4 +227,12 @@ public abstract class SimAbstractMachine(
override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]"
}
+
+ private companion object {
+ /**
+ * The logging instance associated with this class.
+ */
+ @JvmStatic
+ private val logger = KotlinLogging.logger {}
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index ab0b56ae..94581e89 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload
/**
* A generic machine that is able to run a [SimWorkload].
*/
-public interface SimMachine : AutoCloseable {
+public interface SimMachine {
/**
* The model of the machine containing its specifications.
*/
@@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable {
public val peripherals: List<SimPeripheral>
/**
- * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * Start the specified [SimWorkload] on this machine.
+ *
+ * @param workload The workload to start on the machine.
+ * @param meta The metadata to pass to the workload.
+ * @return A [SimMachineContext] that represents the execution context for the workload.
+ * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed.
*/
- public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap())
+ public fun startWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()): SimMachineContext
/**
- * Terminate this machine.
+ * Cancel the workload that is currently running on this machine.
+ *
+ * If no workload is active, this operation is a no-op.
*/
- public override fun close()
+ public fun cancel()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index f6d8f628..07465126 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -28,7 +28,9 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceKey
import org.opendc.simulator.flow.mux.FlowMultiplexer
import kotlin.math.roundToLong
@@ -92,13 +94,20 @@ public abstract class SimAbstractHypervisor(
private val governors = mutableListOf<ScalingGovernor.Logic>()
/* SimHypervisor */
- override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
+ override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
require(canFit(model)) { "Machine does not fit" }
val vm = VirtualMachine(model, interferenceId)
_vms.add(vm)
return vm
}
+ override fun removeMachine(machine: SimVirtualMachine) {
+ if (_vms.remove(machine)) {
+ // This cast must always succeed, since `_vms` only contains `VirtualMachine` types.
+ (machine as VirtualMachine).close()
+ }
+ }
+
/* SimWorkload */
override fun onStart(ctx: SimMachineContext) {
context = ctx
@@ -121,6 +130,8 @@ public abstract class SimAbstractHypervisor(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
private var _cpuCount = 0
private var _cpuCapacity = 0.0
@@ -141,33 +152,31 @@ public abstract class SimAbstractHypervisor(
*
* @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine {
+ private inner class VirtualMachine(
+ model: MachineModel,
+ interferenceId: String? = null
+ ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable {
+ /**
+ * A flag to indicate that the machine is closed.
+ */
+ private var isClosed = false
+
/**
* The interference key of this virtual machine.
*/
- private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) }
+ private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) }
/**
* The vCPUs of the machine.
*/
- override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) }
+ override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) }
/**
* The resource counters associated with the hypervisor.
*/
override val counters: SimHypervisorCounters
get() = _counters
- private val _counters = object : SimHypervisorCounters {
- private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
-
- override val cpuActiveTime: Long
- get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
- override val cpuIdleTime: Long
- get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
- override val cpuStealTime: Long
- get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
- override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong()
- }
+ private val _counters = VmCountersImpl(cpus)
/**
* The CPU capacity of the hypervisor in MHz.
@@ -187,14 +196,58 @@ public abstract class SimAbstractHypervisor(
override val cpuUsage: Double
get() = cpus.sumOf(FlowConsumer::rate)
+ override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
+ check(!isClosed) { "Machine is closed" }
+
+ return super.startWorkload(
+ object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ joinInterferenceDomain()
+ workload.onStart(ctx)
+ } catch (cause: Throwable) {
+ leaveInterferenceDomain()
+ throw cause
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ leaveInterferenceDomain()
+ workload.onStop(ctx)
+ }
+ },
+ meta
+ )
+ }
+
override fun close() {
- super.close()
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ cancel()
for (cpu in cpus) {
cpu.close()
}
+ }
- _vms.remove(this)
+ /**
+ * Join the interference domain of the hypervisor.
+ */
+ private fun joinInterferenceDomain() {
+ val interferenceKey = interferenceKey
+ if (interferenceKey != null) {
+ interferenceDomain?.join(interferenceKey)
+ }
+ }
+
+ /**
+ * Leave the interference domain of the hypervisor.
+ */
+ private fun leaveInterferenceDomain() {
+ val interferenceKey = interferenceKey
if (interferenceKey != null) {
interferenceDomain?.leave(interferenceKey)
}
@@ -211,9 +264,7 @@ public abstract class SimAbstractHypervisor(
) : SimProcessingUnit, FlowConsumer by source {
override var capacity: Double
get() = source.capacity
- set(_) {
- // Ignore capacity changes
- }
+ set(_) = TODO("Capacity changes on vCPU not supported")
override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
@@ -287,4 +338,20 @@ public abstract class SimAbstractHypervisor(
cpuTime[3] += (interferenceDelta * d).roundToLong()
}
}
+
+ /**
+ * A [SimHypervisorCounters] implementation for a virtual machine.
+ */
+ private class VmCountersImpl(private val cpus: List<VCpu>) : SimHypervisorCounters {
+ private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
+
+ override val cpuActiveTime: Long
+ get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
+ override val cpuIdleTime: Long
+ get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
+ override val cpuStealTime: Long
+ get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
+ override val cpuLostTime: Long
+ get() = (cpus.sumOf { it.counters.interference } * d).roundToLong()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 57d4cf20..a69f419f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload {
* @param model The machine to create.
* @param interferenceId An identifier for the interference model.
*/
- public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
+ public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
+
+ /**
+ * Remove the specified [machine] from the hypervisor.
+ *
+ * @param machine The machine to remove.
+ */
+ public fun removeMachine(machine: SimVirtualMachine)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
index b737d61a..09b03306 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
@@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey
*/
public interface VmInterferenceDomain : InterferenceDomain {
/**
- * Join this interference domain.
+ * Construct an [InterferenceKey] for the specified [id].
*
* @param id The identifier of the virtual machine.
+ * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine
+ * does not participate in the domain.
*/
- public fun join(id: String): InterferenceKey
+ public fun createKey(id: String): InterferenceKey?
/**
- * Leave this interference domain.
+ * Remove the specified [key] from this domain.
+ */
+ public fun removeKey(key: InterferenceKey)
+
+ /**
+ * Mark the specified [key] as active in this interference domain.
+ *
+ * @param key The key to join the interference domain with.
+ */
+ public fun join(key: InterferenceKey)
+
+ /**
+ * Mark the specified [key] as inactive in this interference domain.
+ *
+ * @param key The key of the virtual machine that wants to leave the domain.
*/
public fun leave(key: InterferenceKey)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt
deleted file mode 100644
index 708ddede..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.kernel.interference
-
-/**
- * A group of virtual machines that together can interfere when operating on the same resources, causing performance
- * variability.
- */
-public data class VmInterferenceGroup(
- /**
- * The minimum load of the host before the interference occurs.
- */
- public val targetLoad: Double,
-
- /**
- * A score in [0, 1] representing the performance variability as a result of resource interference.
- */
- public val score: Double,
-
- /**
- * The members of this interference group.
- */
- public val members: Set<String>
-)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
index b3d72507..977292be 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
@@ -28,143 +28,366 @@ import java.util.*
/**
* An interference model that models the resource interference between virtual machines on a host.
*
- * @param groups The groups of virtual machines that interfere with each other.
- * @param random The [Random] instance to select the affected virtual machines.
+ * @param targets The target load of each group.
+ * @param scores The performance score of each group.
+ * @param members The members belonging to each group.
+ * @param membership The identifier of each key.
+ * @param size The number of groups.
+ * @param seed The seed to use for randomly selecting the virtual machines that are affected.
*/
-public class VmInterferenceModel(
- private val groups: List<VmInterferenceGroup>,
- private val random: Random = Random(0)
+public class VmInterferenceModel private constructor(
+ private val targets: DoubleArray,
+ private val scores: DoubleArray,
+ private val idMapping: Map<String, Int>,
+ private val members: Array<IntArray>,
+ private val membership: Array<IntArray>,
+ private val size: Int,
+ seed: Long,
) {
/**
+ * A [SplittableRandom] used for selecting the virtual machines that are affected.
+ */
+ private val random = SplittableRandom(seed)
+
+ /**
* Construct a new [VmInterferenceDomain].
*/
- public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain {
+ public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random)
+
+ /**
+ * Create a copy of this model with a different seed.
+ */
+ public fun withSeed(seed: Long): VmInterferenceModel {
+ return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed)
+ }
+
+ public companion object {
/**
- * The stateful groups of this domain.
+ * Construct a [Builder] instance.
*/
- private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) }
+ @JvmStatic
+ public fun builder(): Builder = Builder()
+ }
+ /**
+ * Builder class for a [VmInterferenceModel]
+ */
+ public class Builder internal constructor() {
/**
- * The set of keys active in this domain.
+ * The initial capacity of the builder.
*/
- private val keys = mutableSetOf<InterferenceKeyImpl>()
+ private val INITIAL_CAPACITY = 256
- override fun join(id: String): InterferenceKey {
- val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad })
- keys += key
- return key
- }
+ /**
+ * The target load of each group.
+ */
+ private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY }
- override fun leave(key: InterferenceKey) {
- if (key is InterferenceKeyImpl) {
- keys -= key
- key.leave()
+ /**
+ * The performance score of each group.
+ */
+ private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY }
+
+ /**
+ * The members of each group.
+ */
+ private var _members = ArrayList<Set<String>>(INITIAL_CAPACITY)
+
+ /**
+ * The mapping from member to group id.
+ */
+ private val ids = TreeSet<String>()
+
+ /**
+ * The number of groups in the model.
+ */
+ private var size = 0
+
+ /**
+ * Add the specified group to the model.
+ */
+ public fun addGroup(members: Set<String>, targetLoad: Double, score: Double): Builder {
+ val size = size
+
+ if (size == _targets.size) {
+ grow()
}
+
+ _targets[size] = targetLoad
+ _scores[size] = score
+ _members.add(members)
+ ids.addAll(members)
+
+ this.size++
+
+ return this
}
- override fun apply(key: InterferenceKey?, load: Double): Double {
- if (key == null || key !is InterferenceKeyImpl) {
- return 1.0
- }
+ /**
+ * Build the [VmInterferenceModel].
+ */
+ public fun build(seed: Long = 0): VmInterferenceModel {
+ val size = size
+ val targets = _targets
+ val scores = _scores
+ val members = _members
- val ctx = key.findGroup(load)
- val group = ctx?.group
+ val indices = Array(size) { it }
+ indices.sortWith(
+ Comparator { l, r ->
+ var cmp = targets[l].compareTo(targets[r]) // Order by target load
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
- // Apply performance penalty to (on average) only one of the VMs
- return if (group != null && random.nextInt(group.members.size) == 0) {
- group.score
- } else {
- 1.0
+ cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first)
+ if (cmp != 0)
+ cmp
+ else
+ l.compareTo(r)
+ }
+ )
+
+ val newTargets = DoubleArray(size)
+ val newScores = DoubleArray(size)
+ val newMembers = arrayOfNulls<IntArray>(size)
+
+ var nextId = 0
+ val idMapping = ids.associateWith { nextId++ }
+ val membership = ids.associateWithTo(TreeMap()) { ArrayList<Int>() }
+
+ for ((group, j) in indices.withIndex()) {
+ newTargets[group] = targets[j]
+ newScores[group] = scores[j]
+ val groupMembers = members[j]
+ val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray()
+
+ newGroupMembers.sort()
+ newMembers[group] = newGroupMembers
+
+ for (member in groupMembers) {
+ membership.getValue(member).add(group)
+ }
}
+
+ @Suppress("UNCHECKED_CAST")
+ return VmInterferenceModel(
+ newTargets,
+ newScores,
+ idMapping,
+ newMembers as Array<IntArray>,
+ membership.map { it.value.toIntArray() }.toTypedArray(),
+ size,
+ seed
+ )
}
- override fun toString(): String = "VmInterferenceDomain"
+ /**
+ * Helper function to grow the capacity of the internal arrays.
+ */
+ private fun grow() {
+ val oldSize = _targets.size
+ val newSize = oldSize + (oldSize shr 1)
+
+ _targets = _targets.copyOf(newSize)
+ _scores = _scores.copyOf(newSize)
+ }
}
/**
- * An interference key.
- *
- * @param id The identifier of the member.
- * @param groups The groups to which the key belongs.
+ * Internal implementation of [VmInterferenceDomain].
*/
- private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey {
- init {
- for (group in groups) {
- group.join(this)
- }
- }
+ private class InterferenceDomainImpl(
+ private val targets: DoubleArray,
+ private val scores: DoubleArray,
+ private val idMapping: Map<String, Int>,
+ private val members: Array<IntArray>,
+ private val membership: Array<IntArray>,
+ private val random: SplittableRandom
+ ) : VmInterferenceDomain {
+ /**
+ * Keys registered with this domain.
+ */
+ private val keys = HashMap<Int, InterferenceKeyImpl>()
/**
- * Find the active group that applies for the interference member.
+ * The set of keys active in this domain.
*/
- fun findGroup(load: Double): GroupContext? {
- // Find the first active group whose target load is lower than the current load
- val index = groups.binarySearchBy(load) { it.group.targetLoad }
- val target = if (index >= 0) index else -(index + 1)
+ private val activeKeys = ArrayList<InterferenceKeyImpl>()
- // Check whether there are active groups ahead of the index
- for (i in target until groups.size) {
- val group = groups[i]
- if (group.group.targetLoad > load) {
- break
- } else if (group.isActive) {
- return group
+ override fun createKey(id: String): InterferenceKey? {
+ val intId = idMapping[id] ?: return null
+ return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) }
+ }
+
+ override fun removeKey(key: InterferenceKey) {
+ if (key !is InterferenceKeyImpl) {
+ return
+ }
+
+ if (activeKeys.remove(key)) {
+ computeActiveGroups(key.id)
+ }
+
+ keys.remove(key.id)
+ }
+
+ override fun join(key: InterferenceKey) {
+ if (key !is InterferenceKeyImpl) {
+ return
+ }
+
+ if (key.acquire()) {
+ val pos = activeKeys.binarySearch(key)
+ if (pos < 0) {
+ activeKeys.add(-pos - 1, key)
}
+ computeActiveGroups(key.id)
}
+ }
+
+ override fun leave(key: InterferenceKey) {
+ if (key is InterferenceKeyImpl && key.release()) {
+ activeKeys.remove(key)
+ computeActiveGroups(key.id)
+ }
+ }
+
+ override fun apply(key: InterferenceKey?, load: Double): Double {
+ if (key == null || key !is InterferenceKeyImpl) {
+ return 1.0
+ }
+
+ val groups = key.groups
+ val groupSize = groups.size
+
+ if (groupSize == 0) {
+ return 1.0
+ }
+
+ val targets = targets
+ val scores = scores
+ var low = 0
+ var high = groups.size - 1
- // Check whether there are active groups before the index
- for (i in (target - 1) downTo 0) {
- val group = groups[i]
- if (group.isActive) {
- return group
+ var group = -1
+ var score = 1.0
+
+ // Perform binary search over the groups based on target load
+ while (low <= high) {
+ val mid = low + high ushr 1
+ val midGroup = groups[mid]
+ val target = targets[midGroup]
+
+ if (target < load) {
+ low = mid + 1
+ group = midGroup
+ score = scores[midGroup]
+ } else if (target > load) {
+ high = mid - 1
+ } else {
+ group = midGroup
+ score = scores[midGroup]
+ break
}
}
- return null
+ return if (group >= 0 && random.nextInt(members[group].size) == 0) {
+ score
+ } else {
+ 1.0
+ }
}
+ override fun toString(): String = "VmInterferenceDomain"
+
/**
- * Leave all the groups.
+ * Queue of participants that will be removed or added to the active groups.
*/
- fun leave() {
+ private val _participants = ArrayDeque<InterferenceKeyImpl>()
+
+ /**
+ * (Re-)Compute the active groups.
+ */
+ private fun computeActiveGroups(id: Int) {
+ val activeKeys = activeKeys
+ val groups = membership[id]
+
+ if (activeKeys.isEmpty()) {
+ return
+ }
+
+ val members = members
+ val participants = _participants
+
for (group in groups) {
- group.leave(this)
+ val groupMembers = members[group]
+
+ var i = 0
+ var j = 0
+ var intersection = 0
+
+ // Compute the intersection of the group members and the current active members
+ while (i < groupMembers.size && j < activeKeys.size) {
+ val l = groupMembers[i]
+ val rightEntry = activeKeys[j]
+ val r = rightEntry.id
+
+ if (l < r) {
+ i++
+ } else if (l > r) {
+ j++
+ } else {
+ participants.add(rightEntry)
+ intersection++
+
+ i++
+ j++
+ }
+ }
+
+ while (true) {
+ val participant = participants.poll() ?: break
+ val participantGroups = participant.groups
+ if (intersection <= 1) {
+ participantGroups.remove(group)
+ } else {
+ val pos = participantGroups.binarySearch(group)
+ if (pos < 0) {
+ participantGroups.add(-pos - 1, group)
+ }
+ }
+ }
}
}
}
/**
- * A group context is used to track the active keys per interference group.
+ * An interference key.
+ *
+ * @param id The identifier of the member.
*/
- private inner class GroupContext(val group: VmInterferenceGroup) {
+ private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> {
/**
- * The active keys that are part of this group.
+ * The active groups to which the key belongs.
*/
- private val keys = mutableSetOf<InterferenceKeyImpl>()
+ @JvmField val groups: MutableList<Int> = ArrayList()
/**
- * A flag to indicate that the group is active.
+ * The number of users of the interference key.
*/
- val isActive
- get() = keys.size > 1
+ private var refCount: Int = 0
/**
- * Determine whether the specified [id] is part of this group.
+ * Join the domain.
*/
- operator fun contains(id: String): Boolean = id in group.members
+ fun acquire(): Boolean = refCount++ <= 0
/**
- * Join this group with the specified [key].
+ * Leave the domain.
*/
- fun join(key: InterferenceKeyImpl) {
- keys += key
- }
+ fun release(): Boolean = --refCount <= 0
- /**
- * Leave this group with the specified [key].
- */
- fun leave(key: InterferenceKeyImpl) {
- keys -= key
- }
+ override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 99f4a1e1..726d1f56 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -48,5 +48,7 @@ public class SimFlopsWorkload(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 2ef3bc43..8a3f5f84 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -48,5 +48,7 @@ public class SimRuntimeWorkload(
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 53c98409..ce04a790 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L
}
}
+ override fun onStop(ctx: SimMachineContext) {}
+
override fun toString(): String = "SimTraceWorkload"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index b80665fa..61c6e2ad 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -37,4 +37,11 @@ public interface SimWorkload {
* @param ctx The execution context in which the machine runs.
*/
public fun onStart(ctx: SimMachineContext)
+
+ /**
+ * This method is invoked when the workload is stopped.
+ *
+ * @param ctx The execution context in which the machine runs.
+ */
+ public fun onStop(ctx: SimMachineContext)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index cc4f1f6a..742470a1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
/**
* The resource consumers which represent the lifecycle of the workload.
*/
- private val waiting = mutableSetOf<FlowSource>()
+ private val waiting = HashSet<Wrapper>()
/**
- * Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
+ * Wait for the specified [source] to complete before ending the lifecycle of the workload.
*/
- public fun waitFor(consumer: FlowSource): FlowSource {
- waiting.add(consumer)
- return object : FlowSource by consumer {
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- try {
- consumer.onStop(conn, now, delta)
- } finally {
- complete(consumer)
- }
- }
- override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]"
- }
+ public fun waitFor(source: FlowSource): FlowSource {
+ val wrapper = Wrapper(source)
+ waiting.add(wrapper)
+ return wrapper
}
/**
- * Complete the specified [FlowSource].
+ * Complete the specified [Wrapper].
*/
- private fun complete(consumer: FlowSource) {
- if (waiting.remove(consumer) && waiting.isEmpty()) {
+ private fun complete(wrapper: Wrapper) {
+ if (waiting.remove(wrapper) && waiting.isEmpty()) {
ctx.close()
}
}
+
+ /**
+ * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed.
+ */
+ private inner class Wrapper(private val delegate: FlowSource) : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ delegate.onStart(conn, now)
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ delegate.onConverge(conn, now, delta)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate.onStop(conn, now, delta)
+ } finally {
+ complete(this)
+ }
+ }
+
+ override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]"
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 0bb24ed8..644eb497 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -65,14 +65,10 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
- // Two cores execute 1000 MFlOps per second (1000 ms)
- assertEquals(1000, clock.millis())
- } finally {
- machine.close()
- }
+ // Two cores execute 1000 MFlOps per second (1000 ms)
+ assertEquals(1000, clock.millis())
}
@Test
@@ -88,14 +84,10 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
- // Two sockets with two cores execute 2000 MFlOps per second (500 ms)
- assertEquals(500, clock.millis())
- } finally {
- machine.close()
- }
+ // Two sockets with two cores execute 2000 MFlOps per second (500 ms)
+ assertEquals(500, clock.millis())
}
@Test
@@ -109,16 +101,12 @@ class SimMachineTest {
val source = SimPowerSource(engine, capacity = 1000.0)
source.connect(machine.psu)
- try {
- coroutineScope {
- launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) }
- assertAll(
- { assertEquals(100.0, machine.psu.powerDraw) },
- { assertEquals(100.0, source.powerDraw) }
- )
- }
- } finally {
- machine.close()
+ coroutineScope {
+ launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) }
+ assertAll(
+ { assertEquals(100.0, machine.psu.powerDraw) },
+ { assertEquals(100.0, source.powerDraw) }
+ )
}
}
@@ -130,22 +118,20 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val cpu = ctx.cpus[0]
-
- cpu.capacity = cpu.model.frequency + 1000.0
- assertEquals(cpu.model.frequency, cpu.capacity)
- cpu.capacity = -1.0
- assertEquals(0.0, cpu.capacity)
-
- ctx.close()
- }
- })
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val cpu = ctx.cpus[0]
+
+ cpu.capacity = cpu.model.frequency + 1000.0
+ assertEquals(cpu.model.frequency, cpu.capacity)
+ cpu.capacity = -1.0
+ assertEquals(0.0, cpu.capacity)
+
+ ctx.close()
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
}
@Test
@@ -156,16 +142,14 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- assertEquals(32_000 * 4.0, ctx.memory.capacity)
- ctx.close()
- }
- })
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ assertEquals(32_000 * 4.0, ctx.memory.capacity)
+ ctx.close()
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
}
@Test
@@ -176,18 +160,16 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -202,19 +184,17 @@ class SimMachineTest {
val adapter = (machine.peripherals[0] as SimNetworkAdapter)
adapter.connect(SimNetworkSink(engine, adapter.bandwidth))
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- val iface = ctx.net[0]
- iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ val iface = ctx.net[0]
+ iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -226,19 +206,17 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- val disk = ctx.storage[0]
- disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ val disk = ctx.storage[0]
+ disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -250,19 +228,17 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- machine.run(object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- val lifecycle = SimWorkloadLifecycle(ctx)
- val disk = ctx.storage[0]
- disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8)))
- }
- })
-
- assertEquals(1250, clock.millis())
- } finally {
- machine.close()
- }
+ machine.runWorkload(object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ val disk = ctx.storage[0]
+ disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8)))
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+ })
+
+ assertEquals(1250, clock.millis())
}
@Test
@@ -275,13 +251,11 @@ class SimMachineTest {
try {
coroutineScope {
- launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) }
+ launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) }
cancel()
}
} catch (_: CancellationException) {
// Ignore
- } finally {
- machine.close()
}
assertEquals(0, clock.millis())
@@ -295,31 +269,14 @@ class SimMachineTest {
SimplePowerDriver(ConstantPowerModel(0.0))
)
- try {
- coroutineScope {
- launch {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
- }
+ coroutineScope {
+ launch {
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
+ }
- assertThrows<IllegalStateException> {
- machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
- }
+ assertThrows<IllegalStateException> {
+ machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0))
}
- } finally {
- machine.close()
}
}
-
- @Test
- fun testClose() = runBlockingSimulation {
- val machine = SimBareMetalMachine(
- FlowEngine(coroutineContext, clock),
- machineModel,
- SimplePowerDriver(ConstantPowerModel(0.0))
- )
-
- machine.close()
- assertDoesNotThrow { machine.close() }
- assertThrows<IllegalStateException> { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) }
- }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 6f32cf46..91855e8d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertDoesNotThrow
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -38,6 +37,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
@@ -81,16 +81,16 @@ internal class SimFairShareHypervisorTest {
val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null)
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
println("Hypervisor finished")
}
yield()
- val vm = hypervisor.createMachine(model)
- vm.run(workloadA)
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadA)
yield()
- machine.close()
+ machine.cancel()
assertAll(
{ assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") },
@@ -132,22 +132,22 @@ internal class SimFairShareHypervisorTest {
val hypervisor = SimFairShareHypervisor(platform, null, null, null)
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
}
yield()
coroutineScope {
launch {
- val vm = hypervisor.createMachine(model)
- vm.run(workloadA)
- vm.close()
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadA)
+ hypervisor.removeMachine(vm)
}
- val vm = hypervisor.createMachine(model)
- vm.run(workloadB)
- vm.close()
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadB)
+ hypervisor.removeMachine(vm)
}
yield()
- machine.close()
+ machine.cancel()
yield()
assertAll(
@@ -172,11 +172,11 @@ internal class SimFairShareHypervisorTest {
assertDoesNotThrow {
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
}
}
- machine.close()
+ machine.cancel()
}
@Test
@@ -187,12 +187,11 @@ internal class SimFairShareHypervisorTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val groups = listOf(
- VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")),
- VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")),
- VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
- )
- val interferenceModel = VmInterferenceModel(groups)
+ val interferenceModel = VmInterferenceModel.builder()
+ .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b"))
+ .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c"))
+ .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
+ .build()
val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
@@ -221,20 +220,20 @@ internal class SimFairShareHypervisorTest {
)
launch {
- machine.run(hypervisor)
+ machine.runWorkload(hypervisor)
}
coroutineScope {
launch {
- val vm = hypervisor.createMachine(model, "a")
- vm.run(workloadA)
- vm.close()
+ val vm = hypervisor.newMachine(model, "a")
+ vm.runWorkload(workloadA)
+ hypervisor.removeMachine(vm)
}
- val vm = hypervisor.createMachine(model, "b")
- vm.run(workloadB)
- vm.close()
+ val vm = hypervisor.newMachine(model, "b")
+ vm.runWorkload(workloadB)
+ hypervisor.removeMachine(vm)
}
- machine.close()
+ machine.cancel()
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 02d308ff..823a0ae3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -36,6 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -76,13 +77,13 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
- val vm = hypervisor.createMachine(machineModel)
- vm.run(workloadA)
+ launch { machine.runWorkload(hypervisor) }
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(workloadA)
yield()
- vm.close()
- machine.close()
+ hypervisor.removeMachine(vm)
+ machine.cancel()
assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" }
}
@@ -98,12 +99,13 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- val vm = hypervisor.createMachine(machineModel)
- vm.run(workload)
- vm.close()
- machine.close()
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(workload)
+ hypervisor.removeMachine(vm)
+
+ machine.cancel()
assertEquals(duration, clock.millis()) { "Took enough time" }
}
@@ -121,11 +123,11 @@ internal class SimSpaceSharedHypervisorTest {
)
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- val vm = hypervisor.createMachine(machineModel)
- vm.run(workload)
- machine.close()
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(workload)
+ machine.cancel()
assertEquals(duration, clock.millis()) { "Took enough time" }
}
@@ -142,19 +144,20 @@ internal class SimSpaceSharedHypervisorTest {
)
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- val vm = hypervisor.createMachine(machineModel)
- vm.run(SimRuntimeWorkload(duration))
- vm.close()
+ val vm = hypervisor.newMachine(machineModel)
+ vm.runWorkload(SimRuntimeWorkload(duration))
+ hypervisor.removeMachine(vm)
yield()
- val vm2 = hypervisor.createMachine(machineModel)
- vm2.run(SimRuntimeWorkload(duration))
- vm2.close()
- machine.close()
+ val vm2 = hypervisor.newMachine(machineModel)
+ vm2.runWorkload(SimRuntimeWorkload(duration))
+ hypervisor.removeMachine(vm2)
+
+ machine.cancel()
assertEquals(duration * 2, clock.millis()) { "Took enough time" }
}
@@ -168,17 +171,17 @@ internal class SimSpaceSharedHypervisorTest {
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- hypervisor.createMachine(machineModel)
+ hypervisor.newMachine(machineModel)
assertAll(
{ assertFalse(hypervisor.canFit(machineModel)) },
- { assertThrows<IllegalArgumentException> { hypervisor.createMachine(machineModel) } }
+ { assertThrows<IllegalArgumentException> { hypervisor.newMachine(machineModel) } }
)
- machine.close()
+ machine.cancel()
}
/**
@@ -192,16 +195,16 @@ internal class SimSpaceSharedHypervisorTest {
)
val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null)
- launch { machine.run(hypervisor) }
+ launch { machine.runWorkload(hypervisor) }
yield()
- hypervisor.createMachine(machineModel).close()
+ hypervisor.removeMachine(hypervisor.newMachine(machineModel))
assertAll(
{ assertTrue(hypervisor.canFit(machineModel)) },
- { assertDoesNotThrow { hypervisor.createMachine(machineModel) } }
+ { assertDoesNotThrow { hypervisor.newMachine(machineModel) } }
)
- machine.close()
+ machine.cancel()
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index 574860e8..aa91984a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -30,6 +30,7 @@ import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.model.*
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -67,13 +68,9 @@ class SimTraceWorkloadTest {
offset = 0
)
- try {
- machine.run(workload)
+ machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(4000, clock.millis())
}
@Test
@@ -94,13 +91,9 @@ class SimTraceWorkloadTest {
offset = 1000
)
- try {
- machine.run(workload)
+ machine.runWorkload(workload)
- assertEquals(5000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(5000, clock.millis())
}
@Test
@@ -121,14 +114,10 @@ class SimTraceWorkloadTest {
offset = 0
)
- try {
- delay(1000L)
- machine.run(workload)
+ delay(1000L)
+ machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(4000, clock.millis())
}
@Test
@@ -149,12 +138,8 @@ class SimTraceWorkloadTest {
offset = 0
)
- try {
- machine.run(workload)
+ machine.runWorkload(workload)
- assertEquals(4000, clock.millis())
- } finally {
- machine.close()
- }
+ assertEquals(4000, clock.millis())
}
}