diff options
Diffstat (limited to 'opendc-simulator')
22 files changed, 835 insertions, 463 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index a2bb89c2..ca8b912a 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -35,7 +35,7 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcUtils) + implementation(libs.kotlin.logging) testImplementation(libs.slf4j.simple) } diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index cb52d24f..91e91f9d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -76,7 +76,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - return@runBlockingSimulation machine.run(SimTraceWorkload(trace)) + return@runBlockingSimulation machine.runWorkload(SimTraceWorkload(trace)) } } @@ -89,15 +89,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -111,15 +111,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -133,22 +133,22 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } coroutineScope { repeat(2) { - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) launch { try { - vm.run(SimTraceWorkload(trace)) + vm.runWorkload(SimTraceWorkload(trace)) } finally { - machine.close() + machine.cancel() } } } } - machine.close() + machine.cancel() } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt new file mode 100644 index 00000000..c23f48dc --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.simulator.compute.workload.SimWorkload +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. + */ +public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) { + return suspendCancellableCoroutine { cont -> + cont.invokeOnCancellation { this@runWorkload.cancel() } + + startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + workload.onStart(ctx) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + try { + workload.onStop(ctx) + + if (!cont.isCompleted) { + cont.resume(Unit) + } + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + }, + meta + ) + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 60a10f20..6a4c594d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* +import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral import org.opendc.simulator.compute.model.MachineModel @@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. @@ -72,48 +71,20 @@ public abstract class SimAbstractMachine( public override val peripherals: List<SimPeripheral> = net.map { it as SimNetworkAdapter } /** - * A flag to indicate that the machine is terminated. + * The current active [Context]. */ - private var isTerminated = false + private var _ctx: Context? = null - /** - * The continuation to resume when the virtual machine workload has finished. - */ - private var cont: Continuation<Unit>? = null - - /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { - check(!isTerminated) { "Machine is terminated" } - check(cont == null) { "A machine cannot run concurrently" } - - val ctx = Context(meta) - - return suspendCancellableCoroutine { cont -> - this.cont = cont - - // Cancel all cpus on cancellation - cont.invokeOnCancellation { - this.cont = null - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - } + override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { + check(_ctx == null) { "A machine cannot run concurrently" } - engine.batch { workload.onStart(ctx) } - } + val ctx = Context(workload, meta) + ctx.start() + return ctx } - override fun close() { - if (isTerminated) { - return - } - - isTerminated = true - cancel() + override fun cancel() { + _ctx?.close() } override fun onConverge(now: Long, delta: Long) { @@ -121,29 +92,35 @@ public abstract class SimAbstractMachine( } /** - * Cancel the workload that is currently running on the machine. + * The execution context in which the workload runs. + * + * @param workload The workload that is running on the machine. + * @param meta The metadata passed to the workload. */ - private fun cancel() { - engine.batch { - for (cpu in cpus) { - cpu.cancel() + private inner class Context( + private val workload: SimWorkload, + override val meta: Map<String, Any> + ) : SimMachineContext { + /** + * A flag to indicate that the context has been closed. + */ + private var isClosed = false + + override val engine: FlowEngine = this@SimAbstractMachine.engine + + /** + * Start this context. + */ + fun start() { + try { + _ctx = this + engine.batch { workload.onStart(this) } + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStart callback" } + close() } } - val cont = cont - if (cont != null) { - this.cont = null - cont.resume(Unit) - } - } - - /** - * The execution context in which the workload runs. - */ - private inner class Context(override val meta: Map<String, Any>) : SimMachineContext { - override val engine: FlowEngine - get() = this@SimAbstractMachine.engine - override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus override val memory: SimMemory = this@SimAbstractMachine.memory @@ -152,7 +129,49 @@ public abstract class SimAbstractMachine( override val storage: List<SimStorageInterface> = this@SimAbstractMachine.storage - override fun close() = cancel() + override fun close() { + if (isClosed) { + return + } + + isClosed = true + assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" } + _ctx = null + + // Cancel all the resources associated with the machine + doCancel() + + try { + workload.onStop(this) + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStop callback" } + } + } + + /** + * Run the stop procedures for the resources associated with the machine. + */ + private fun doCancel() { + engine.batch { + for (cpu in cpus) { + cpu.cancel() + } + + memory.cancel() + + for (ifx in net) { + (ifx as NetworkAdapterImpl).disconnect() + } + + for (storage in storage) { + val impl = storage as StorageDeviceImpl + impl.read.cancel() + impl.write.cancel() + } + } + } + + override fun toString(): String = "SimAbstractMachine.Context" } /** @@ -166,7 +185,7 @@ public abstract class SimAbstractMachine( * The [SimNetworkAdapter] implementation for a machine. */ private class NetworkAdapterImpl( - private val engine: FlowEngine, + engine: FlowEngine, model: NetworkAdapter, index: Int ) : SimNetworkAdapter(), SimNetworkInterface { @@ -208,4 +227,12 @@ public abstract class SimAbstractMachine( override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]" } + + private companion object { + /** + * The logging instance associated with this class. + */ + @JvmStatic + private val logger = KotlinLogging.logger {} + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ab0b56ae..94581e89 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A generic machine that is able to run a [SimWorkload]. */ -public interface SimMachine : AutoCloseable { +public interface SimMachine { /** * The model of the machine containing its specifications. */ @@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable { public val peripherals: List<SimPeripheral> /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Start the specified [SimWorkload] on this machine. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) + public fun startWorkload(workload: SimWorkload, meta: Map<String, Any> = emptyMap()): SimMachineContext /** - * Terminate this machine. + * Cancel the workload that is currently running on this machine. + * + * If no workload is active, this operation is a no-op. */ - public override fun close() + public fun cancel() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index f6d8f628..07465126 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -28,7 +28,9 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -92,13 +94,20 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf<ScalingGovernor.Logic>() /* SimHypervisor */ - override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { + override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } val vm = VirtualMachine(model, interferenceId) _vms.add(vm) return vm } + override fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx @@ -121,6 +130,8 @@ public abstract class SimAbstractHypervisor( } } + override fun onStop(ctx: SimMachineContext) {} + private var _cpuCount = 0 private var _cpuCapacity = 0.0 @@ -141,33 +152,31 @@ public abstract class SimAbstractHypervisor( * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + private inner class VirtualMachine( + model: MachineModel, + interferenceId: String? = null + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + /** * The interference key of this virtual machine. */ - private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) } + private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) } + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) } /** * The resource counters associated with the hypervisor. */ override val counters: SimHypervisorCounters get() = _counters - private val _counters = object : SimHypervisorCounters { - private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 - - override val cpuActiveTime: Long - get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() - override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() - override val cpuStealTime: Long - get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() - override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong() - } + private val _counters = VmCountersImpl(cpus) /** * The CPU capacity of the hypervisor in MHz. @@ -187,14 +196,58 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) + override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + joinInterferenceDomain() + workload.onStart(ctx) + } catch (cause: Throwable) { + leaveInterferenceDomain() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + leaveInterferenceDomain() + workload.onStop(ctx) + } + }, + meta + ) + } + override fun close() { - super.close() + if (isClosed) { + return + } + + isClosed = true + cancel() for (cpu in cpus) { cpu.close() } + } - _vms.remove(this) + /** + * Join the interference domain of the hypervisor. + */ + private fun joinInterferenceDomain() { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + } + + /** + * Leave the interference domain of the hypervisor. + */ + private fun leaveInterferenceDomain() { + val interferenceKey = interferenceKey if (interferenceKey != null) { interferenceDomain?.leave(interferenceKey) } @@ -211,9 +264,7 @@ public abstract class SimAbstractHypervisor( ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity - set(_) { - // Ignore capacity changes - } + set(_) = TODO("Capacity changes on vCPU not supported") override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" @@ -287,4 +338,20 @@ public abstract class SimAbstractHypervisor( cpuTime[3] += (interferenceDelta * d).roundToLong() } } + + /** + * A [SimHypervisorCounters] implementation for a virtual machine. + */ + private class VmCountersImpl(private val cpus: List<VCpu>) : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() + override val cpuIdleTime: Long + get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + override val cpuStealTime: Long + get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() + override val cpuLostTime: Long + get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 57d4cf20..a69f419f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload { * @param model The machine to create. * @param interferenceId An identifier for the interference model. */ - public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + + /** + * Remove the specified [machine] from the hypervisor. + * + * @param machine The machine to remove. + */ + public fun removeMachine(machine: SimVirtualMachine) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index b737d61a..09b03306 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey */ public interface VmInterferenceDomain : InterferenceDomain { /** - * Join this interference domain. + * Construct an [InterferenceKey] for the specified [id]. * * @param id The identifier of the virtual machine. + * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine + * does not participate in the domain. */ - public fun join(id: String): InterferenceKey + public fun createKey(id: String): InterferenceKey? /** - * Leave this interference domain. + * Remove the specified [key] from this domain. + */ + public fun removeKey(key: InterferenceKey) + + /** + * Mark the specified [key] as active in this interference domain. + * + * @param key The key to join the interference domain with. + */ + public fun join(key: InterferenceKey) + + /** + * Mark the specified [key] as inactive in this interference domain. + * + * @param key The key of the virtual machine that wants to leave the domain. */ public fun leave(key: InterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt deleted file mode 100644 index 708ddede..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel.interference - -/** - * A group of virtual machines that together can interfere when operating on the same resources, causing performance - * variability. - */ -public data class VmInterferenceGroup( - /** - * The minimum load of the host before the interference occurs. - */ - public val targetLoad: Double, - - /** - * A score in [0, 1] representing the performance variability as a result of resource interference. - */ - public val score: Double, - - /** - * The members of this interference group. - */ - public val members: Set<String> -) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index b3d72507..977292be 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -28,143 +28,366 @@ import java.util.* /** * An interference model that models the resource interference between virtual machines on a host. * - * @param groups The groups of virtual machines that interfere with each other. - * @param random The [Random] instance to select the affected virtual machines. + * @param targets The target load of each group. + * @param scores The performance score of each group. + * @param members The members belonging to each group. + * @param membership The identifier of each key. + * @param size The number of groups. + * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ -public class VmInterferenceModel( - private val groups: List<VmInterferenceGroup>, - private val random: Random = Random(0) +public class VmInterferenceModel private constructor( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map<String, Int>, + private val members: Array<IntArray>, + private val membership: Array<IntArray>, + private val size: Int, + seed: Long, ) { /** + * A [SplittableRandom] used for selecting the virtual machines that are affected. + */ + private val random = SplittableRandom(seed) + + /** * Construct a new [VmInterferenceDomain]. */ - public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain { + public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random) + + /** + * Create a copy of this model with a different seed. + */ + public fun withSeed(seed: Long): VmInterferenceModel { + return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed) + } + + public companion object { /** - * The stateful groups of this domain. + * Construct a [Builder] instance. */ - private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) } + @JvmStatic + public fun builder(): Builder = Builder() + } + /** + * Builder class for a [VmInterferenceModel] + */ + public class Builder internal constructor() { /** - * The set of keys active in this domain. + * The initial capacity of the builder. */ - private val keys = mutableSetOf<InterferenceKeyImpl>() + private val INITIAL_CAPACITY = 256 - override fun join(id: String): InterferenceKey { - val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad }) - keys += key - return key - } + /** + * The target load of each group. + */ + private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } - override fun leave(key: InterferenceKey) { - if (key is InterferenceKeyImpl) { - keys -= key - key.leave() + /** + * The performance score of each group. + */ + private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } + + /** + * The members of each group. + */ + private var _members = ArrayList<Set<String>>(INITIAL_CAPACITY) + + /** + * The mapping from member to group id. + */ + private val ids = TreeSet<String>() + + /** + * The number of groups in the model. + */ + private var size = 0 + + /** + * Add the specified group to the model. + */ + public fun addGroup(members: Set<String>, targetLoad: Double, score: Double): Builder { + val size = size + + if (size == _targets.size) { + grow() } + + _targets[size] = targetLoad + _scores[size] = score + _members.add(members) + ids.addAll(members) + + this.size++ + + return this } - override fun apply(key: InterferenceKey?, load: Double): Double { - if (key == null || key !is InterferenceKeyImpl) { - return 1.0 - } + /** + * Build the [VmInterferenceModel]. + */ + public fun build(seed: Long = 0): VmInterferenceModel { + val size = size + val targets = _targets + val scores = _scores + val members = _members - val ctx = key.findGroup(load) - val group = ctx?.group + val indices = Array(size) { it } + indices.sortWith( + Comparator { l, r -> + var cmp = targets[l].compareTo(targets[r]) // Order by target load + if (cmp != 0) { + return@Comparator cmp + } - // Apply performance penalty to (on average) only one of the VMs - return if (group != null && random.nextInt(group.members.size) == 0) { - group.score - } else { - 1.0 + cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first) + if (cmp != 0) + cmp + else + l.compareTo(r) + } + ) + + val newTargets = DoubleArray(size) + val newScores = DoubleArray(size) + val newMembers = arrayOfNulls<IntArray>(size) + + var nextId = 0 + val idMapping = ids.associateWith { nextId++ } + val membership = ids.associateWithTo(TreeMap()) { ArrayList<Int>() } + + for ((group, j) in indices.withIndex()) { + newTargets[group] = targets[j] + newScores[group] = scores[j] + val groupMembers = members[j] + val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray() + + newGroupMembers.sort() + newMembers[group] = newGroupMembers + + for (member in groupMembers) { + membership.getValue(member).add(group) + } } + + @Suppress("UNCHECKED_CAST") + return VmInterferenceModel( + newTargets, + newScores, + idMapping, + newMembers as Array<IntArray>, + membership.map { it.value.toIntArray() }.toTypedArray(), + size, + seed + ) } - override fun toString(): String = "VmInterferenceDomain" + /** + * Helper function to grow the capacity of the internal arrays. + */ + private fun grow() { + val oldSize = _targets.size + val newSize = oldSize + (oldSize shr 1) + + _targets = _targets.copyOf(newSize) + _scores = _scores.copyOf(newSize) + } } /** - * An interference key. - * - * @param id The identifier of the member. - * @param groups The groups to which the key belongs. + * Internal implementation of [VmInterferenceDomain]. */ - private inner class InterferenceKeyImpl(val id: String, private val groups: List<GroupContext>) : InterferenceKey { - init { - for (group in groups) { - group.join(this) - } - } + private class InterferenceDomainImpl( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map<String, Int>, + private val members: Array<IntArray>, + private val membership: Array<IntArray>, + private val random: SplittableRandom + ) : VmInterferenceDomain { + /** + * Keys registered with this domain. + */ + private val keys = HashMap<Int, InterferenceKeyImpl>() /** - * Find the active group that applies for the interference member. + * The set of keys active in this domain. */ - fun findGroup(load: Double): GroupContext? { - // Find the first active group whose target load is lower than the current load - val index = groups.binarySearchBy(load) { it.group.targetLoad } - val target = if (index >= 0) index else -(index + 1) + private val activeKeys = ArrayList<InterferenceKeyImpl>() - // Check whether there are active groups ahead of the index - for (i in target until groups.size) { - val group = groups[i] - if (group.group.targetLoad > load) { - break - } else if (group.isActive) { - return group + override fun createKey(id: String): InterferenceKey? { + val intId = idMapping[id] ?: return null + return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } + } + + override fun removeKey(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (activeKeys.remove(key)) { + computeActiveGroups(key.id) + } + + keys.remove(key.id) + } + + override fun join(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (key.acquire()) { + val pos = activeKeys.binarySearch(key) + if (pos < 0) { + activeKeys.add(-pos - 1, key) } + computeActiveGroups(key.id) } + } + + override fun leave(key: InterferenceKey) { + if (key is InterferenceKeyImpl && key.release()) { + activeKeys.remove(key) + computeActiveGroups(key.id) + } + } + + override fun apply(key: InterferenceKey?, load: Double): Double { + if (key == null || key !is InterferenceKeyImpl) { + return 1.0 + } + + val groups = key.groups + val groupSize = groups.size + + if (groupSize == 0) { + return 1.0 + } + + val targets = targets + val scores = scores + var low = 0 + var high = groups.size - 1 - // Check whether there are active groups before the index - for (i in (target - 1) downTo 0) { - val group = groups[i] - if (group.isActive) { - return group + var group = -1 + var score = 1.0 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + score = scores[midGroup] + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + score = scores[midGroup] + break } } - return null + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + score + } else { + 1.0 + } } + override fun toString(): String = "VmInterferenceDomain" + /** - * Leave all the groups. + * Queue of participants that will be removed or added to the active groups. */ - fun leave() { + private val _participants = ArrayDeque<InterferenceKeyImpl>() + + /** + * (Re-)Compute the active groups. + */ + private fun computeActiveGroups(id: Int) { + val activeKeys = activeKeys + val groups = membership[id] + + if (activeKeys.isEmpty()) { + return + } + + val members = members + val participants = _participants + for (group in groups) { - group.leave(this) + val groupMembers = members[group] + + var i = 0 + var j = 0 + var intersection = 0 + + // Compute the intersection of the group members and the current active members + while (i < groupMembers.size && j < activeKeys.size) { + val l = groupMembers[i] + val rightEntry = activeKeys[j] + val r = rightEntry.id + + if (l < r) { + i++ + } else if (l > r) { + j++ + } else { + participants.add(rightEntry) + intersection++ + + i++ + j++ + } + } + + while (true) { + val participant = participants.poll() ?: break + val participantGroups = participant.groups + if (intersection <= 1) { + participantGroups.remove(group) + } else { + val pos = participantGroups.binarySearch(group) + if (pos < 0) { + participantGroups.add(-pos - 1, group) + } + } + } } } } /** - * A group context is used to track the active keys per interference group. + * An interference key. + * + * @param id The identifier of the member. */ - private inner class GroupContext(val group: VmInterferenceGroup) { + private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> { /** - * The active keys that are part of this group. + * The active groups to which the key belongs. */ - private val keys = mutableSetOf<InterferenceKeyImpl>() + @JvmField val groups: MutableList<Int> = ArrayList() /** - * A flag to indicate that the group is active. + * The number of users of the interference key. */ - val isActive - get() = keys.size > 1 + private var refCount: Int = 0 /** - * Determine whether the specified [id] is part of this group. + * Join the domain. */ - operator fun contains(id: String): Boolean = id in group.members + fun acquire(): Boolean = refCount++ <= 0 /** - * Join this group with the specified [key]. + * Leave the domain. */ - fun join(key: InterferenceKeyImpl) { - keys += key - } + fun release(): Boolean = --refCount <= 0 - /** - * Leave this group with the specified [key]. - */ - fun leave(key: InterferenceKeyImpl) { - keys -= key - } + override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 99f4a1e1..726d1f56 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -48,5 +48,7 @@ public class SimFlopsWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 2ef3bc43..8a3f5f84 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -48,5 +48,7 @@ public class SimRuntimeWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 53c98409..ce04a790 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimTraceWorkload" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index b80665fa..61c6e2ad 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -37,4 +37,11 @@ public interface SimWorkload { * @param ctx The execution context in which the machine runs. */ public fun onStart(ctx: SimMachineContext) + + /** + * This method is invoked when the workload is stopped. + * + * @param ctx The execution context in which the machine runs. + */ + public fun onStop(ctx: SimMachineContext) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index cc4f1f6a..742470a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { /** * The resource consumers which represent the lifecycle of the workload. */ - private val waiting = mutableSetOf<FlowSource>() + private val waiting = HashSet<Wrapper>() /** - * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. + * Wait for the specified [source] to complete before ending the lifecycle of the workload. */ - public fun waitFor(consumer: FlowSource): FlowSource { - waiting.add(consumer) - return object : FlowSource by consumer { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - try { - consumer.onStop(conn, now, delta) - } finally { - complete(consumer) - } - } - override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" - } + public fun waitFor(source: FlowSource): FlowSource { + val wrapper = Wrapper(source) + waiting.add(wrapper) + return wrapper } /** - * Complete the specified [FlowSource]. + * Complete the specified [Wrapper]. */ - private fun complete(consumer: FlowSource) { - if (waiting.remove(consumer) && waiting.isEmpty()) { + private fun complete(wrapper: Wrapper) { + if (waiting.remove(wrapper) && waiting.isEmpty()) { ctx.close() } } + + /** + * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed. + */ + private inner class Wrapper(private val delegate: FlowSource) : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + delegate.onStart(conn, now) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + delegate.onConverge(conn, now, delta) + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate.onStop(conn, now, delta) + } finally { + complete(this) + } + } + + override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]" + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 0bb24ed8..644eb497 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -65,14 +65,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two cores execute 1000 MFlOps per second (1000 ms) - assertEquals(1000, clock.millis()) - } finally { - machine.close() - } + // Two cores execute 1000 MFlOps per second (1000 ms) + assertEquals(1000, clock.millis()) } @Test @@ -88,14 +84,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two sockets with two cores execute 2000 MFlOps per second (500 ms) - assertEquals(500, clock.millis()) - } finally { - machine.close() - } + // Two sockets with two cores execute 2000 MFlOps per second (500 ms) + assertEquals(500, clock.millis()) } @Test @@ -109,16 +101,12 @@ class SimMachineTest { val source = SimPowerSource(engine, capacity = 1000.0) source.connect(machine.psu) - try { - coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - assertAll( - { assertEquals(100.0, machine.psu.powerDraw) }, - { assertEquals(100.0, source.powerDraw) } - ) - } - } finally { - machine.close() + coroutineScope { + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } + assertAll( + { assertEquals(100.0, machine.psu.powerDraw) }, + { assertEquals(100.0, source.powerDraw) } + ) } } @@ -130,22 +118,20 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val cpu = ctx.cpus[0] - - cpu.capacity = cpu.model.frequency + 1000.0 - assertEquals(cpu.model.frequency, cpu.capacity) - cpu.capacity = -1.0 - assertEquals(0.0, cpu.capacity) - - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val cpu = ctx.cpus[0] + + cpu.capacity = cpu.model.frequency + 1000.0 + assertEquals(cpu.model.frequency, cpu.capacity) + cpu.capacity = -1.0 + assertEquals(0.0, cpu.capacity) + + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -156,16 +142,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - assertEquals(32_000 * 4.0, ctx.memory.capacity) - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + assertEquals(32_000 * 4.0, ctx.memory.capacity) + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -176,18 +160,16 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -202,19 +184,17 @@ class SimMachineTest { val adapter = (machine.peripherals[0] as SimNetworkAdapter) adapter.connect(SimNetworkSink(engine, adapter.bandwidth)) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val iface = ctx.net[0] - iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val iface = ctx.net[0] + iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -226,19 +206,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -250,19 +228,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -275,13 +251,11 @@ class SimMachineTest { try { coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } cancel() } } catch (_: CancellationException) { // Ignore - } finally { - machine.close() } assertEquals(0, clock.millis()) @@ -295,31 +269,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - coroutineScope { - launch { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + coroutineScope { + launch { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) + } - assertThrows<IllegalStateException> { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + assertThrows<IllegalStateException> { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } - } finally { - machine.close() } } - - @Test - fun testClose() = runBlockingSimulation { - val machine = SimBareMetalMachine( - FlowEngine(coroutineContext, clock), - machineModel, - SimplePowerDriver(ConstantPowerModel(0.0)) - ) - - machine.close() - assertDoesNotThrow { machine.close() } - assertThrows<IllegalStateException> { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 6f32cf46..91855e8d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -38,6 +37,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -81,16 +81,16 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) println("Hypervisor finished") } yield() - val vm = hypervisor.createMachine(model) - vm.run(workloadA) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) yield() - machine.close() + machine.cancel() assertAll( { assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") }, @@ -132,22 +132,22 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, null, null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } yield() coroutineScope { launch { - val vm = hypervisor.createMachine(model) - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model) - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } yield() - machine.close() + machine.cancel() yield() assertAll( @@ -172,11 +172,11 @@ internal class SimFairShareHypervisorTest { assertDoesNotThrow { launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } } - machine.close() + machine.cancel() } @Test @@ -187,12 +187,11 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val groups = listOf( - VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")), - VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")), - VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) - ) - val interferenceModel = VmInterferenceModel(groups) + val interferenceModel = VmInterferenceModel.builder() + .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")) + .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")) + .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) + .build() val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( @@ -221,20 +220,20 @@ internal class SimFairShareHypervisorTest { ) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } coroutineScope { launch { - val vm = hypervisor.createMachine(model, "a") - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model, "a") + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model, "b") - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model, "b") + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 02d308ff..823a0ae3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -36,6 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -76,13 +77,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } - val vm = hypervisor.createMachine(machineModel) - vm.run(workloadA) + launch { machine.runWorkload(hypervisor) } + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workloadA) yield() - vm.close() - machine.close() + hypervisor.removeMachine(vm) + machine.cancel() assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } @@ -98,12 +99,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - vm.close() - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + hypervisor.removeMachine(vm) + + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -121,11 +123,11 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -142,19 +144,20 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(SimRuntimeWorkload(duration)) - vm.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm) yield() - val vm2 = hypervisor.createMachine(machineModel) - vm2.run(SimRuntimeWorkload(duration)) - vm2.close() - machine.close() + val vm2 = hypervisor.newMachine(machineModel) + vm2.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm2) + + machine.cancel() assertEquals(duration * 2, clock.millis()) { "Took enough time" } } @@ -168,17 +171,17 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel) + hypervisor.newMachine(machineModel) assertAll( { assertFalse(hypervisor.canFit(machineModel)) }, - { assertThrows<IllegalArgumentException> { hypervisor.createMachine(machineModel) } } + { assertThrows<IllegalArgumentException> { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } /** @@ -192,16 +195,16 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel).close() + hypervisor.removeMachine(hypervisor.newMachine(machineModel)) assertAll( { assertTrue(hypervisor.canFit(machineModel)) }, - { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } + { assertDoesNotThrow { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index 574860e8..aa91984a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -30,6 +30,7 @@ import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -67,13 +68,9 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -94,13 +91,9 @@ class SimTraceWorkloadTest { offset = 1000 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(5000, clock.millis()) - } finally { - machine.close() - } + assertEquals(5000, clock.millis()) } @Test @@ -121,14 +114,10 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - delay(1000L) - machine.run(workload) + delay(1000L) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -149,12 +138,8 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 04ba7f21..a7877546 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -62,13 +62,21 @@ public interface FlowMultiplexer { public val counters: FlowCounters /** - * Create a new input on this multiplexer. + * Create a new input on this multiplexer with a coupled capacity. * * @param key The key of the interference member to which the input belongs. */ public fun newInput(key: InterferenceKey? = null): FlowConsumer /** + * Create a new input on this multiplexer with the specified [capacity]. + * + * @param capacity The capacity of the input. + * @param key The key of the interference member to which the input belongs. + */ + public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer + + /** * Remove [input] from this multiplexer. */ public fun removeInput(input: FlowConsumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 125d10fe..b68a8baa 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -78,6 +78,8 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return input } + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key) + override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index a0fb8a4e..3d26efda 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -86,7 +86,15 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) + return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key) + } + + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer { + return newInput(isCoupled = false, capacity, key) + } + + private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer { + val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity) _inputs.add(provider) return provider } @@ -206,7 +214,10 @@ public class MaxMinFlowMultiplexer( // Disable timers and convergence of the source if one of the output manages it input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput - input.capacity = capacity + + if (input.isCoupled) { + input.capacity = capacity + } trigger(_clock.millis()) } @@ -340,7 +351,9 @@ public class MaxMinFlowMultiplexer( capacity = newCapacity for (input in _activeInputs) { - input.capacity = newCapacity + if (input.isCoupled) { + input.capacity = newCapacity + } } // Sort outputs by their capacity @@ -495,6 +508,7 @@ public class MaxMinFlowMultiplexer( private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, @JvmField val key: InterferenceKey?, + @JvmField val isCoupled: Boolean, initialCapacity: Double, ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { /** |
