From e76bebe9e81c3813422da6d67fbab7d9f471a317 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 15 Oct 2021 14:43:43 +0200 Subject: perf(compute): Redesign VM interference algorithm This change redesigns the virtual machine interference algorithm to have a fixed memory usage per `VmInterferenceModel` instance. Previously, for every interference domain, a copy of the model would be created, leading to OutOfMemory errors when running multiple experiments at the same time. --- .../opendc/simulator/compute/SimAbstractMachine.kt | 82 +++-- .../compute/kernel/SimAbstractHypervisor.kt | 30 +- .../kernel/interference/VmInterferenceDomain.kt | 22 +- .../kernel/interference/VmInterferenceGroup.kt | 44 --- .../kernel/interference/VmInterferenceModel.kt | 385 ++++++++++++++++----- .../compute/kernel/SimFairShareHypervisorTest.kt | 12 +- 6 files changed, 401 insertions(+), 174 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 60a10f20..5909d980 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -77,33 +77,41 @@ public abstract class SimAbstractMachine( private var isTerminated = false /** - * The continuation to resume when the virtual machine workload has finished. + * The current active [Context]. */ - private var cont: Continuation? = null + private var _ctx: Context? = null + + /** + * This method is invoked when the machine is started. + */ + protected open fun onStart(ctx: SimMachineContext) {} + + /** + * This method is invoked when the machine is stopped. + */ + protected open fun onStop(ctx: SimMachineContext) { + _ctx = null + } /** * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ override suspend fun run(workload: SimWorkload, meta: Map) { check(!isTerminated) { "Machine is terminated" } - check(cont == null) { "A machine cannot run concurrently" } - - val ctx = Context(meta) + check(_ctx == null) { "A machine cannot run concurrently" } return suspendCancellableCoroutine { cont -> - this.cont = cont + val ctx = Context(meta, cont) + _ctx = ctx // Cancel all cpus on cancellation - cont.invokeOnCancellation { - this.cont = null - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - } + cont.invokeOnCancellation { ctx.close() } - engine.batch { workload.onStart(ctx) } + engine.batch { + onStart(ctx) + + workload.onStart(ctx) + } } } @@ -113,34 +121,22 @@ public abstract class SimAbstractMachine( } isTerminated = true - cancel() + _ctx?.close() } override fun onConverge(now: Long, delta: Long) { parent?.onConverge(now, delta) } - /** - * Cancel the workload that is currently running on the machine. - */ - private fun cancel() { - engine.batch { - for (cpu in cpus) { - cpu.cancel() - } - } - - val cont = cont - if (cont != null) { - this.cont = null - cont.resume(Unit) - } - } - /** * The execution context in which the workload runs. */ - private inner class Context(override val meta: Map) : SimMachineContext { + private inner class Context(override val meta: Map, private val cont: Continuation) : SimMachineContext { + /** + * A flag to indicate that the context has been closed. + */ + private var isClosed = false + override val engine: FlowEngine get() = this@SimAbstractMachine.engine @@ -152,7 +148,21 @@ public abstract class SimAbstractMachine( override val storage: List = this@SimAbstractMachine.storage - override fun close() = cancel() + override fun close() { + if (isClosed) { + return + } + + isClosed = true + engine.batch { + for (cpu in cpus) { + cpu.cancel() + } + } + + onStop(this) + cont.resume(Unit) + } } /** @@ -166,7 +176,7 @@ public abstract class SimAbstractMachine( * The [SimNetworkAdapter] implementation for a machine. */ private class NetworkAdapterImpl( - private val engine: FlowEngine, + engine: FlowEngine, model: NetworkAdapter, index: Int ) : SimNetworkAdapter(), SimNetworkInterface { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index f6d8f628..90bf5e25 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -29,6 +29,7 @@ import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -141,11 +142,14 @@ public abstract class SimAbstractHypervisor( * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + private inner class VirtualMachine( + model: MachineModel, + private val interferenceId: String? = null + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { /** * The interference key of this virtual machine. */ - private val interferenceKey = interferenceId?.let { interferenceDomain?.join(interferenceId) } + private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. @@ -187,6 +191,24 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) + override fun onStart(ctx: SimMachineContext) { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + + super.onStart(ctx) + } + + override fun onStop(ctx: SimMachineContext) { + super.onStop(ctx) + + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.leave(interferenceKey) + } + } + override fun close() { super.close() @@ -195,8 +217,10 @@ public abstract class SimAbstractHypervisor( } _vms.remove(this) + + val interferenceKey = interferenceKey if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) + interferenceDomain?.removeKey(interferenceKey) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index b737d61a..09b03306 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -30,14 +30,30 @@ import org.opendc.simulator.flow.interference.InterferenceKey */ public interface VmInterferenceDomain : InterferenceDomain { /** - * Join this interference domain. + * Construct an [InterferenceKey] for the specified [id]. * * @param id The identifier of the virtual machine. + * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine + * does not participate in the domain. */ - public fun join(id: String): InterferenceKey + public fun createKey(id: String): InterferenceKey? /** - * Leave this interference domain. + * Remove the specified [key] from this domain. + */ + public fun removeKey(key: InterferenceKey) + + /** + * Mark the specified [key] as active in this interference domain. + * + * @param key The key to join the interference domain with. + */ + public fun join(key: InterferenceKey) + + /** + * Mark the specified [key] as inactive in this interference domain. + * + * @param key The key of the virtual machine that wants to leave the domain. */ public fun leave(key: InterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt deleted file mode 100644 index 708ddede..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceGroup.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel.interference - -/** - * A group of virtual machines that together can interfere when operating on the same resources, causing performance - * variability. - */ -public data class VmInterferenceGroup( - /** - * The minimum load of the host before the interference occurs. - */ - public val targetLoad: Double, - - /** - * A score in [0, 1] representing the performance variability as a result of resource interference. - */ - public val score: Double, - - /** - * The members of this interference group. - */ - public val members: Set -) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index b3d72507..977292be 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -28,143 +28,366 @@ import java.util.* /** * An interference model that models the resource interference between virtual machines on a host. * - * @param groups The groups of virtual machines that interfere with each other. - * @param random The [Random] instance to select the affected virtual machines. + * @param targets The target load of each group. + * @param scores The performance score of each group. + * @param members The members belonging to each group. + * @param membership The identifier of each key. + * @param size The number of groups. + * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ -public class VmInterferenceModel( - private val groups: List, - private val random: Random = Random(0) +public class VmInterferenceModel private constructor( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map, + private val members: Array, + private val membership: Array, + private val size: Int, + seed: Long, ) { + /** + * A [SplittableRandom] used for selecting the virtual machines that are affected. + */ + private val random = SplittableRandom(seed) + /** * Construct a new [VmInterferenceDomain]. */ - public fun newDomain(): VmInterferenceDomain = object : VmInterferenceDomain { + public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random) + + /** + * Create a copy of this model with a different seed. + */ + public fun withSeed(seed: Long): VmInterferenceModel { + return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed) + } + + public companion object { /** - * The stateful groups of this domain. + * Construct a [Builder] instance. */ - private val groups = this@VmInterferenceModel.groups.map { GroupContext(it) } + @JvmStatic + public fun builder(): Builder = Builder() + } + /** + * Builder class for a [VmInterferenceModel] + */ + public class Builder internal constructor() { /** - * The set of keys active in this domain. + * The initial capacity of the builder. */ - private val keys = mutableSetOf() + private val INITIAL_CAPACITY = 256 - override fun join(id: String): InterferenceKey { - val key = InterferenceKeyImpl(id, groups.filter { id in it }.sortedBy { it.group.targetLoad }) - keys += key - return key - } + /** + * The target load of each group. + */ + private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } - override fun leave(key: InterferenceKey) { - if (key is InterferenceKeyImpl) { - keys -= key - key.leave() + /** + * The performance score of each group. + */ + private var _scores = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } + + /** + * The members of each group. + */ + private var _members = ArrayList>(INITIAL_CAPACITY) + + /** + * The mapping from member to group id. + */ + private val ids = TreeSet() + + /** + * The number of groups in the model. + */ + private var size = 0 + + /** + * Add the specified group to the model. + */ + public fun addGroup(members: Set, targetLoad: Double, score: Double): Builder { + val size = size + + if (size == _targets.size) { + grow() } + + _targets[size] = targetLoad + _scores[size] = score + _members.add(members) + ids.addAll(members) + + this.size++ + + return this } - override fun apply(key: InterferenceKey?, load: Double): Double { - if (key == null || key !is InterferenceKeyImpl) { - return 1.0 - } + /** + * Build the [VmInterferenceModel]. + */ + public fun build(seed: Long = 0): VmInterferenceModel { + val size = size + val targets = _targets + val scores = _scores + val members = _members - val ctx = key.findGroup(load) - val group = ctx?.group + val indices = Array(size) { it } + indices.sortWith( + Comparator { l, r -> + var cmp = targets[l].compareTo(targets[r]) // Order by target load + if (cmp != 0) { + return@Comparator cmp + } - // Apply performance penalty to (on average) only one of the VMs - return if (group != null && random.nextInt(group.members.size) == 0) { - group.score - } else { - 1.0 + cmp = scores[l].compareTo(scores[r]) // Higher penalty first (this means lower performance score first) + if (cmp != 0) + cmp + else + l.compareTo(r) + } + ) + + val newTargets = DoubleArray(size) + val newScores = DoubleArray(size) + val newMembers = arrayOfNulls(size) + + var nextId = 0 + val idMapping = ids.associateWith { nextId++ } + val membership = ids.associateWithTo(TreeMap()) { ArrayList() } + + for ((group, j) in indices.withIndex()) { + newTargets[group] = targets[j] + newScores[group] = scores[j] + val groupMembers = members[j] + val newGroupMembers = groupMembers.map { idMapping.getValue(it) }.toIntArray() + + newGroupMembers.sort() + newMembers[group] = newGroupMembers + + for (member in groupMembers) { + membership.getValue(member).add(group) + } } + + @Suppress("UNCHECKED_CAST") + return VmInterferenceModel( + newTargets, + newScores, + idMapping, + newMembers as Array, + membership.map { it.value.toIntArray() }.toTypedArray(), + size, + seed + ) } - override fun toString(): String = "VmInterferenceDomain" + /** + * Helper function to grow the capacity of the internal arrays. + */ + private fun grow() { + val oldSize = _targets.size + val newSize = oldSize + (oldSize shr 1) + + _targets = _targets.copyOf(newSize) + _scores = _scores.copyOf(newSize) + } } /** - * An interference key. - * - * @param id The identifier of the member. - * @param groups The groups to which the key belongs. + * Internal implementation of [VmInterferenceDomain]. */ - private inner class InterferenceKeyImpl(val id: String, private val groups: List) : InterferenceKey { - init { - for (group in groups) { - group.join(this) - } - } + private class InterferenceDomainImpl( + private val targets: DoubleArray, + private val scores: DoubleArray, + private val idMapping: Map, + private val members: Array, + private val membership: Array, + private val random: SplittableRandom + ) : VmInterferenceDomain { + /** + * Keys registered with this domain. + */ + private val keys = HashMap() /** - * Find the active group that applies for the interference member. + * The set of keys active in this domain. */ - fun findGroup(load: Double): GroupContext? { - // Find the first active group whose target load is lower than the current load - val index = groups.binarySearchBy(load) { it.group.targetLoad } - val target = if (index >= 0) index else -(index + 1) + private val activeKeys = ArrayList() - // Check whether there are active groups ahead of the index - for (i in target until groups.size) { - val group = groups[i] - if (group.group.targetLoad > load) { - break - } else if (group.isActive) { - return group + override fun createKey(id: String): InterferenceKey? { + val intId = idMapping[id] ?: return null + return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } + } + + override fun removeKey(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (activeKeys.remove(key)) { + computeActiveGroups(key.id) + } + + keys.remove(key.id) + } + + override fun join(key: InterferenceKey) { + if (key !is InterferenceKeyImpl) { + return + } + + if (key.acquire()) { + val pos = activeKeys.binarySearch(key) + if (pos < 0) { + activeKeys.add(-pos - 1, key) } + computeActiveGroups(key.id) } + } + + override fun leave(key: InterferenceKey) { + if (key is InterferenceKeyImpl && key.release()) { + activeKeys.remove(key) + computeActiveGroups(key.id) + } + } + + override fun apply(key: InterferenceKey?, load: Double): Double { + if (key == null || key !is InterferenceKeyImpl) { + return 1.0 + } + + val groups = key.groups + val groupSize = groups.size + + if (groupSize == 0) { + return 1.0 + } + + val targets = targets + val scores = scores + var low = 0 + var high = groups.size - 1 - // Check whether there are active groups before the index - for (i in (target - 1) downTo 0) { - val group = groups[i] - if (group.isActive) { - return group + var group = -1 + var score = 1.0 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + score = scores[midGroup] + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + score = scores[midGroup] + break } } - return null + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + score + } else { + 1.0 + } } + override fun toString(): String = "VmInterferenceDomain" + /** - * Leave all the groups. + * Queue of participants that will be removed or added to the active groups. */ - fun leave() { + private val _participants = ArrayDeque() + + /** + * (Re-)Compute the active groups. + */ + private fun computeActiveGroups(id: Int) { + val activeKeys = activeKeys + val groups = membership[id] + + if (activeKeys.isEmpty()) { + return + } + + val members = members + val participants = _participants + for (group in groups) { - group.leave(this) + val groupMembers = members[group] + + var i = 0 + var j = 0 + var intersection = 0 + + // Compute the intersection of the group members and the current active members + while (i < groupMembers.size && j < activeKeys.size) { + val l = groupMembers[i] + val rightEntry = activeKeys[j] + val r = rightEntry.id + + if (l < r) { + i++ + } else if (l > r) { + j++ + } else { + participants.add(rightEntry) + intersection++ + + i++ + j++ + } + } + + while (true) { + val participant = participants.poll() ?: break + val participantGroups = participant.groups + if (intersection <= 1) { + participantGroups.remove(group) + } else { + val pos = participantGroups.binarySearch(group) + if (pos < 0) { + participantGroups.add(-pos - 1, group) + } + } + } } } } /** - * A group context is used to track the active keys per interference group. + * An interference key. + * + * @param id The identifier of the member. */ - private inner class GroupContext(val group: VmInterferenceGroup) { + private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable { /** - * The active keys that are part of this group. + * The active groups to which the key belongs. */ - private val keys = mutableSetOf() + @JvmField val groups: MutableList = ArrayList() /** - * A flag to indicate that the group is active. + * The number of users of the interference key. */ - val isActive - get() = keys.size > 1 + private var refCount: Int = 0 /** - * Determine whether the specified [id] is part of this group. + * Join the domain. */ - operator fun contains(id: String): Boolean = id in group.members + fun acquire(): Boolean = refCount++ <= 0 /** - * Join this group with the specified [key]. + * Leave the domain. */ - fun join(key: InterferenceKeyImpl) { - keys += key - } + fun release(): Boolean = --refCount <= 0 - /** - * Leave this group with the specified [key]. - */ - fun leave(key: InterferenceKeyImpl) { - keys -= key - } + override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 6f32cf46..b7f5bf8e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -30,7 +30,6 @@ import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertDoesNotThrow import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceGroup import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -187,12 +186,11 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val groups = listOf( - VmInterferenceGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")), - VmInterferenceGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")), - VmInterferenceGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) - ) - val interferenceModel = VmInterferenceModel(groups) + val interferenceModel = VmInterferenceModel.builder() + .addGroup(targetLoad = 0.0, score = 0.9, members = setOf("a", "b")) + .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")) + .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) + .build() val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( -- cgit v1.2.3 From ba310a3545c9631e1e4ff61a0a1759228ec5cf63 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 16 Oct 2021 16:52:49 +0200 Subject: fix(simulator): Use correct flow input capacity for counters This change fixes an issue with the FlowMultiplexer implementation where the capacity of each flow input was equal to the capacity of all flow outputs. Now, the user can specify the capacity of the input, which will be used to correctly compute the active and idle time. --- .../compute/kernel/SimAbstractHypervisor.kt | 36 ++++++++++++---------- .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 10 +++++- .../flow/mux/ForwardingFlowMultiplexer.kt | 2 ++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 20 ++++++++++-- 4 files changed, 48 insertions(+), 20 deletions(-) (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 90bf5e25..eda59d2d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -144,7 +144,7 @@ public abstract class SimAbstractHypervisor( */ private inner class VirtualMachine( model: MachineModel, - private val interferenceId: String? = null + interferenceId: String? = null ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { /** * The interference key of this virtual machine. @@ -154,24 +154,14 @@ public abstract class SimAbstractHypervisor( /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) } + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) } /** * The resource counters associated with the hypervisor. */ override val counters: SimHypervisorCounters get() = _counters - private val _counters = object : SimHypervisorCounters { - private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 - - override val cpuActiveTime: Long - get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() - override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() - override val cpuStealTime: Long - get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() - override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong() - } + private val _counters = VmCountersImpl(cpus) /** * The CPU capacity of the hypervisor in MHz. @@ -235,9 +225,7 @@ public abstract class SimAbstractHypervisor( ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity - set(_) { - // Ignore capacity changes - } + set(_) = TODO("Capacity changes on vCPU not supported") override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" @@ -311,4 +299,20 @@ public abstract class SimAbstractHypervisor( cpuTime[3] += (interferenceDelta * d).roundToLong() } } + + /** + * A [SimHypervisorCounters] implementation for a virtual machine. + */ + private class VmCountersImpl(private val cpus: List) : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() + override val cpuIdleTime: Long + get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + override val cpuStealTime: Long + get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() + override val cpuLostTime: Long + get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 04ba7f21..a7877546 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -62,12 +62,20 @@ public interface FlowMultiplexer { public val counters: FlowCounters /** - * Create a new input on this multiplexer. + * Create a new input on this multiplexer with a coupled capacity. * * @param key The key of the interference member to which the input belongs. */ public fun newInput(key: InterferenceKey? = null): FlowConsumer + /** + * Create a new input on this multiplexer with the specified [capacity]. + * + * @param capacity The capacity of the input. + * @param key The key of the interference member to which the input belongs. + */ + public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer + /** * Remove [input] from this multiplexer. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 125d10fe..b68a8baa 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -78,6 +78,8 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return input } + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key) + override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index a0fb8a4e..3d26efda 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -86,7 +86,15 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) + return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key) + } + + override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer { + return newInput(isCoupled = false, capacity, key) + } + + private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer { + val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity) _inputs.add(provider) return provider } @@ -206,7 +214,10 @@ public class MaxMinFlowMultiplexer( // Disable timers and convergence of the source if one of the output manages it input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput - input.capacity = capacity + + if (input.isCoupled) { + input.capacity = capacity + } trigger(_clock.millis()) } @@ -340,7 +351,9 @@ public class MaxMinFlowMultiplexer( capacity = newCapacity for (input in _activeInputs) { - input.capacity = newCapacity + if (input.isCoupled) { + input.capacity = newCapacity + } } // Sort outputs by their capacity @@ -495,6 +508,7 @@ public class MaxMinFlowMultiplexer( private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, @JvmField val key: InterferenceKey?, + @JvmField val isCoupled: Boolean, initialCapacity: Double, ) : FlowConsumer, FlowConsumerLogic, Comparable { /** -- cgit v1.2.3 From 86c65e875b7dde8872dc81a37aa9dca72eee7782 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 19 Oct 2021 17:27:01 +0200 Subject: refactor(simulator): Support running workloads without coroutines This change updates the SimMachine interface to drop the coroutine requirement for running a workload on a machines. Users can now asynchronously start a workload and receive notifications via the workload callbacks. Users still have the possibility to suspend execution during workload execution by using the new `runWorkload` method, which is implemented on top of the new `startWorkload` primitive. --- .../opendc-simulator-compute/build.gradle.kts | 2 +- .../simulator/compute/SimMachineBenchmarks.kt | 32 ++-- .../org/opendc/simulator/compute/Coroutines.kt | 69 +++++++ .../opendc/simulator/compute/SimAbstractMachine.kt | 115 ++++++----- .../org/opendc/simulator/compute/SimMachine.kt | 17 +- .../compute/kernel/SimAbstractHypervisor.kt | 81 ++++++-- .../simulator/compute/kernel/SimHypervisor.kt | 9 +- .../simulator/compute/workload/SimFlopsWorkload.kt | 2 + .../compute/workload/SimRuntimeWorkload.kt | 2 + .../simulator/compute/workload/SimTraceWorkload.kt | 2 + .../simulator/compute/workload/SimWorkload.kt | 7 + .../compute/workload/SimWorkloadLifecycle.kt | 53 ++++-- .../org/opendc/simulator/compute/SimMachineTest.kt | 211 ++++++++------------- .../compute/kernel/SimFairShareHypervisorTest.kt | 45 ++--- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 63 +++--- .../compute/workload/SimTraceWorkloadTest.kt | 35 +--- 16 files changed, 431 insertions(+), 314 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index a2bb89c2..ca8b912a 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -35,7 +35,7 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcUtils) + implementation(libs.kotlin.logging) testImplementation(libs.slf4j.simple) } diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index cb52d24f..91e91f9d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -76,7 +76,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - return@runBlockingSimulation machine.run(SimTraceWorkload(trace)) + return@runBlockingSimulation machine.runWorkload(SimTraceWorkload(trace)) } } @@ -89,15 +89,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -111,15 +111,15 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) try { - return@runBlockingSimulation vm.run(SimTraceWorkload(trace)) + return@runBlockingSimulation vm.runWorkload(SimTraceWorkload(trace)) } finally { - vm.close() - machine.close() + vm.cancel() + machine.cancel() } } } @@ -133,22 +133,22 @@ class SimMachineBenchmarks { ) val hypervisor = SimFairShareHypervisor(engine, null, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } coroutineScope { repeat(2) { - val vm = hypervisor.createMachine(machineModel) + val vm = hypervisor.newMachine(machineModel) launch { try { - vm.run(SimTraceWorkload(trace)) + vm.runWorkload(SimTraceWorkload(trace)) } finally { - machine.close() + machine.cancel() } } } } - machine.close() + machine.cancel() } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt new file mode 100644 index 00000000..c23f48dc --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.simulator.compute.workload.SimWorkload +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. + */ +public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map = emptyMap()) { + return suspendCancellableCoroutine { cont -> + cont.invokeOnCancellation { this@runWorkload.cancel() } + + startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + workload.onStart(ctx) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + try { + workload.onStop(ctx) + + if (!cont.isCompleted) { + cont.resume(Unit) + } + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + }, + meta + ) + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 5909d980..6a4c594d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* +import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral import org.opendc.simulator.compute.model.MachineModel @@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. @@ -71,56 +70,20 @@ public abstract class SimAbstractMachine( */ public override val peripherals: List = net.map { it as SimNetworkAdapter } - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - /** * The current active [Context]. */ private var _ctx: Context? = null - /** - * This method is invoked when the machine is started. - */ - protected open fun onStart(ctx: SimMachineContext) {} - - /** - * This method is invoked when the machine is stopped. - */ - protected open fun onStop(ctx: SimMachineContext) { - _ctx = null - } - - /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map) { - check(!isTerminated) { "Machine is terminated" } + override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { check(_ctx == null) { "A machine cannot run concurrently" } - return suspendCancellableCoroutine { cont -> - val ctx = Context(meta, cont) - _ctx = ctx - - // Cancel all cpus on cancellation - cont.invokeOnCancellation { ctx.close() } - - engine.batch { - onStart(ctx) - - workload.onStart(ctx) - } - } + val ctx = Context(workload, meta) + ctx.start() + return ctx } - override fun close() { - if (isTerminated) { - return - } - - isTerminated = true + override fun cancel() { _ctx?.close() } @@ -130,15 +93,33 @@ public abstract class SimAbstractMachine( /** * The execution context in which the workload runs. + * + * @param workload The workload that is running on the machine. + * @param meta The metadata passed to the workload. */ - private inner class Context(override val meta: Map, private val cont: Continuation) : SimMachineContext { + private inner class Context( + private val workload: SimWorkload, + override val meta: Map + ) : SimMachineContext { /** * A flag to indicate that the context has been closed. */ private var isClosed = false - override val engine: FlowEngine - get() = this@SimAbstractMachine.engine + override val engine: FlowEngine = this@SimAbstractMachine.engine + + /** + * Start this context. + */ + fun start() { + try { + _ctx = this + engine.batch { workload.onStart(this) } + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStart callback" } + close() + } + } override val cpus: List = this@SimAbstractMachine.cpus @@ -154,15 +135,43 @@ public abstract class SimAbstractMachine( } isClosed = true + assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" } + _ctx = null + + // Cancel all the resources associated with the machine + doCancel() + + try { + workload.onStop(this) + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStop callback" } + } + } + + /** + * Run the stop procedures for the resources associated with the machine. + */ + private fun doCancel() { engine.batch { for (cpu in cpus) { cpu.cancel() } - } - onStop(this) - cont.resume(Unit) + memory.cancel() + + for (ifx in net) { + (ifx as NetworkAdapterImpl).disconnect() + } + + for (storage in storage) { + val impl = storage as StorageDeviceImpl + impl.read.cancel() + impl.write.cancel() + } + } } + + override fun toString(): String = "SimAbstractMachine.Context" } /** @@ -218,4 +227,12 @@ public abstract class SimAbstractMachine( override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]" } + + private companion object { + /** + * The logging instance associated with this class. + */ + @JvmStatic + private val logger = KotlinLogging.logger {} + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ab0b56ae..94581e89 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A generic machine that is able to run a [SimWorkload]. */ -public interface SimMachine : AutoCloseable { +public interface SimMachine { /** * The model of the machine containing its specifications. */ @@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable { public val peripherals: List /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Start the specified [SimWorkload] on this machine. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - public suspend fun run(workload: SimWorkload, meta: Map = emptyMap()) + public fun startWorkload(workload: SimWorkload, meta: Map = emptyMap()): SimMachineContext /** - * Terminate this machine. + * Cancel the workload that is currently running on this machine. + * + * If no workload is active, this operation is a no-op. */ - public override fun close() + public fun cancel() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index eda59d2d..07465126 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -28,6 +28,7 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer @@ -93,13 +94,20 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf() /* SimHypervisor */ - override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { + override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } val vm = VirtualMachine(model, interferenceId) _vms.add(vm) return vm } + override fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx @@ -122,6 +130,8 @@ public abstract class SimAbstractHypervisor( } } + override fun onStop(ctx: SimMachineContext) {} + private var _cpuCount = 0 private var _cpuCapacity = 0.0 @@ -145,11 +155,16 @@ public abstract class SimAbstractHypervisor( private inner class VirtualMachine( model: MachineModel, interferenceId: String? = null - ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + /** * The interference key of this virtual machine. */ - private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } + private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. @@ -181,36 +196,60 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) - override fun onStart(ctx: SimMachineContext) { - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.join(interferenceKey) - } - - super.onStart(ctx) + override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + joinInterferenceDomain() + workload.onStart(ctx) + } catch (cause: Throwable) { + leaveInterferenceDomain() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + leaveInterferenceDomain() + workload.onStop(ctx) + } + }, + meta + ) } - override fun onStop(ctx: SimMachineContext) { - super.onStop(ctx) - - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) + override fun close() { + if (isClosed) { + return } - } - override fun close() { - super.close() + isClosed = true + cancel() for (cpu in cpus) { cpu.close() } + } - _vms.remove(this) + /** + * Join the interference domain of the hypervisor. + */ + private fun joinInterferenceDomain() { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + } + /** + * Leave the interference domain of the hypervisor. + */ + private fun leaveInterferenceDomain() { val interferenceKey = interferenceKey if (interferenceKey != null) { - interferenceDomain?.removeKey(interferenceKey) + interferenceDomain?.leave(interferenceKey) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 57d4cf20..a69f419f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload { * @param model The machine to create. * @param interferenceId An identifier for the interference model. */ - public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + + /** + * Remove the specified [machine] from the hypervisor. + * + * @param machine The machine to remove. + */ + public fun removeMachine(machine: SimVirtualMachine) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 99f4a1e1..726d1f56 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -48,5 +48,7 @@ public class SimFlopsWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 2ef3bc43..8a3f5f84 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -48,5 +48,7 @@ public class SimRuntimeWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 53c98409..ce04a790 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimTraceWorkload" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index b80665fa..61c6e2ad 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -37,4 +37,11 @@ public interface SimWorkload { * @param ctx The execution context in which the machine runs. */ public fun onStart(ctx: SimMachineContext) + + /** + * This method is invoked when the workload is stopped. + * + * @param ctx The execution context in which the machine runs. + */ + public fun onStop(ctx: SimMachineContext) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index cc4f1f6a..742470a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { /** * The resource consumers which represent the lifecycle of the workload. */ - private val waiting = mutableSetOf() + private val waiting = HashSet() /** - * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. + * Wait for the specified [source] to complete before ending the lifecycle of the workload. */ - public fun waitFor(consumer: FlowSource): FlowSource { - waiting.add(consumer) - return object : FlowSource by consumer { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - try { - consumer.onStop(conn, now, delta) - } finally { - complete(consumer) - } - } - override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" - } + public fun waitFor(source: FlowSource): FlowSource { + val wrapper = Wrapper(source) + waiting.add(wrapper) + return wrapper } /** - * Complete the specified [FlowSource]. + * Complete the specified [Wrapper]. */ - private fun complete(consumer: FlowSource) { - if (waiting.remove(consumer) && waiting.isEmpty()) { + private fun complete(wrapper: Wrapper) { + if (waiting.remove(wrapper) && waiting.isEmpty()) { ctx.close() } } + + /** + * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed. + */ + private inner class Wrapper(private val delegate: FlowSource) : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + delegate.onStart(conn, now) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + delegate.onConverge(conn, now, delta) + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate.onStop(conn, now, delta) + } finally { + complete(this) + } + } + + override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]" + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 0bb24ed8..644eb497 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -65,14 +65,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two cores execute 1000 MFlOps per second (1000 ms) - assertEquals(1000, clock.millis()) - } finally { - machine.close() - } + // Two cores execute 1000 MFlOps per second (1000 ms) + assertEquals(1000, clock.millis()) } @Test @@ -88,14 +84,10 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) - // Two sockets with two cores execute 2000 MFlOps per second (500 ms) - assertEquals(500, clock.millis()) - } finally { - machine.close() - } + // Two sockets with two cores execute 2000 MFlOps per second (500 ms) + assertEquals(500, clock.millis()) } @Test @@ -109,16 +101,12 @@ class SimMachineTest { val source = SimPowerSource(engine, capacity = 1000.0) source.connect(machine.psu) - try { - coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - assertAll( - { assertEquals(100.0, machine.psu.powerDraw) }, - { assertEquals(100.0, source.powerDraw) } - ) - } - } finally { - machine.close() + coroutineScope { + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } + assertAll( + { assertEquals(100.0, machine.psu.powerDraw) }, + { assertEquals(100.0, source.powerDraw) } + ) } } @@ -130,22 +118,20 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val cpu = ctx.cpus[0] - - cpu.capacity = cpu.model.frequency + 1000.0 - assertEquals(cpu.model.frequency, cpu.capacity) - cpu.capacity = -1.0 - assertEquals(0.0, cpu.capacity) - - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val cpu = ctx.cpus[0] + + cpu.capacity = cpu.model.frequency + 1000.0 + assertEquals(cpu.model.frequency, cpu.capacity) + cpu.capacity = -1.0 + assertEquals(0.0, cpu.capacity) + + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -156,16 +142,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - assertEquals(32_000 * 4.0, ctx.memory.capacity) - ctx.close() - } - }) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + assertEquals(32_000 * 4.0, ctx.memory.capacity) + ctx.close() + } + + override fun onStop(ctx: SimMachineContext) {} + }) } @Test @@ -176,18 +160,16 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -202,19 +184,17 @@ class SimMachineTest { val adapter = (machine.peripherals[0] as SimNetworkAdapter) adapter.connect(SimNetworkSink(engine, adapter.bandwidth)) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val iface = ctx.net[0] - iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val iface = ctx.net[0] + iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -226,19 +206,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -250,19 +228,17 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - machine.run(object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - val lifecycle = SimWorkloadLifecycle(ctx) - val disk = ctx.storage[0] - disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) - } - }) - - assertEquals(1250, clock.millis()) - } finally { - machine.close() - } + machine.runWorkload(object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + val lifecycle = SimWorkloadLifecycle(ctx) + val disk = ctx.storage[0] + disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) + } + + override fun onStop(ctx: SimMachineContext) {} + }) + + assertEquals(1250, clock.millis()) } @Test @@ -275,13 +251,11 @@ class SimMachineTest { try { coroutineScope { - launch { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } + launch { machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } cancel() } } catch (_: CancellationException) { // Ignore - } finally { - machine.close() } assertEquals(0, clock.millis()) @@ -295,31 +269,14 @@ class SimMachineTest { SimplePowerDriver(ConstantPowerModel(0.0)) ) - try { - coroutineScope { - launch { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + coroutineScope { + launch { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) + } - assertThrows { - machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) - } + assertThrows { + machine.runWorkload(SimFlopsWorkload(2_000, utilization = 1.0)) } - } finally { - machine.close() } } - - @Test - fun testClose() = runBlockingSimulation { - val machine = SimBareMetalMachine( - FlowEngine(coroutineContext, clock), - machineModel, - SimplePowerDriver(ConstantPowerModel(0.0)) - ) - - machine.close() - assertDoesNotThrow { machine.close() } - assertThrows { machine.run(SimFlopsWorkload(2_000, utilization = 1.0)) } - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index b7f5bf8e..91855e8d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -37,6 +37,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -80,16 +81,16 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) println("Hypervisor finished") } yield() - val vm = hypervisor.createMachine(model) - vm.run(workloadA) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) yield() - machine.close() + machine.cancel() assertAll( { assertEquals(319781, hypervisor.counters.cpuActiveTime, "Active time does not match") }, @@ -131,22 +132,22 @@ internal class SimFairShareHypervisorTest { val hypervisor = SimFairShareHypervisor(platform, null, null, null) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } yield() coroutineScope { launch { - val vm = hypervisor.createMachine(model) - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model) - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } yield() - machine.close() + machine.cancel() yield() assertAll( @@ -171,11 +172,11 @@ internal class SimFairShareHypervisorTest { assertDoesNotThrow { launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } } - machine.close() + machine.cancel() } @Test @@ -219,20 +220,20 @@ internal class SimFairShareHypervisorTest { ) launch { - machine.run(hypervisor) + machine.runWorkload(hypervisor) } coroutineScope { launch { - val vm = hypervisor.createMachine(model, "a") - vm.run(workloadA) - vm.close() + val vm = hypervisor.newMachine(model, "a") + vm.runWorkload(workloadA) + hypervisor.removeMachine(vm) } - val vm = hypervisor.createMachine(model, "b") - vm.run(workloadB) - vm.close() + val vm = hypervisor.newMachine(model, "b") + vm.runWorkload(workloadB) + hypervisor.removeMachine(vm) } - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 02d308ff..823a0ae3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -36,6 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -76,13 +77,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } - val vm = hypervisor.createMachine(machineModel) - vm.run(workloadA) + launch { machine.runWorkload(hypervisor) } + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workloadA) yield() - vm.close() - machine.close() + hypervisor.removeMachine(vm) + machine.cancel() assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } @@ -98,12 +99,13 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - vm.close() - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + hypervisor.removeMachine(vm) + + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -121,11 +123,11 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(workload) - machine.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(workload) + machine.cancel() assertEquals(duration, clock.millis()) { "Took enough time" } } @@ -142,19 +144,20 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - val vm = hypervisor.createMachine(machineModel) - vm.run(SimRuntimeWorkload(duration)) - vm.close() + val vm = hypervisor.newMachine(machineModel) + vm.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm) yield() - val vm2 = hypervisor.createMachine(machineModel) - vm2.run(SimRuntimeWorkload(duration)) - vm2.close() - machine.close() + val vm2 = hypervisor.newMachine(machineModel) + vm2.runWorkload(SimRuntimeWorkload(duration)) + hypervisor.removeMachine(vm2) + + machine.cancel() assertEquals(duration * 2, clock.millis()) { "Took enough time" } } @@ -168,17 +171,17 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimSpaceSharedHypervisor(engine, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel) + hypervisor.newMachine(machineModel) assertAll( { assertFalse(hypervisor.canFit(machineModel)) }, - { assertThrows { hypervisor.createMachine(machineModel) } } + { assertThrows { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } /** @@ -192,16 +195,16 @@ internal class SimSpaceSharedHypervisorTest { ) val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null) - launch { machine.run(hypervisor) } + launch { machine.runWorkload(hypervisor) } yield() - hypervisor.createMachine(machineModel).close() + hypervisor.removeMachine(hypervisor.newMachine(machineModel)) assertAll( { assertTrue(hypervisor.canFit(machineModel)) }, - { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } + { assertDoesNotThrow { hypervisor.newMachine(machineModel) } } ) - machine.close() + machine.cancel() } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index 574860e8..aa91984a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -30,6 +30,7 @@ import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -67,13 +68,9 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -94,13 +91,9 @@ class SimTraceWorkloadTest { offset = 1000 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(5000, clock.millis()) - } finally { - machine.close() - } + assertEquals(5000, clock.millis()) } @Test @@ -121,14 +114,10 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - delay(1000L) - machine.run(workload) + delay(1000L) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } @Test @@ -149,12 +138,8 @@ class SimTraceWorkloadTest { offset = 0 ) - try { - machine.run(workload) + machine.runWorkload(workload) - assertEquals(4000, clock.millis()) - } finally { - machine.close() - } + assertEquals(4000, clock.millis()) } } -- cgit v1.2.3