From 06b19fbf17b9e6d8024ba36e0f2533b2db0dd7de Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 2 Sep 2022 22:01:13 +0200 Subject: refactor(sim/compute): Move VM interference model into compute simulator This change moves the core of the VM interference model from the flow module into the compute simulator. This logic can be contained in the compute simulator and does not need to leak into the flow-level simulator. --- .../experiments/capelin/CapelinIntegrationTest.kt | 2 +- .../compute/kernel/SimAbstractHypervisor.kt | 84 ++++++++++++++++++---- .../compute/kernel/SimFairShareHypervisor.kt | 2 +- .../kernel/interference/VmInterferenceDomain.kt | 25 ++++--- .../kernel/interference/VmInterferenceKey.kt | 28 ++++++++ .../kernel/interference/VmInterferenceModel.kt | 13 ++-- .../compute/kernel/SimFairShareHypervisorTest.kt | 1 - .../org/opendc/simulator/flow/FlowCounters.kt | 5 -- .../org/opendc/simulator/flow/FlowForwarder.kt | 2 +- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 2 +- .../flow/interference/InterferenceDomain.kt | 19 ----- .../simulator/flow/interference/InterferenceKey.kt | 28 -------- .../simulator/flow/internal/MutableFlowCounters.kt | 9 +-- .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 8 +-- .../flow/mux/ForwardingFlowMultiplexer.kt | 7 +- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 37 +++------- 16 files changed, 140 insertions(+), 132 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index d7b7caad..368b0086 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -209,7 +209,7 @@ class CapelinIntegrationTest { { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(477068, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 8e925bdf..9495095e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -30,7 +30,7 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.compute.kernel.interference.VmInterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -144,6 +144,10 @@ public abstract class SimAbstractHypervisor( if (delta > 0) { _counters.record() + + for (vm in _vms) { + vm._counters.record() + } } val load = cpuDemand / cpuCapacity @@ -171,19 +175,19 @@ public abstract class SimAbstractHypervisor( /** * The interference key of this virtual machine. */ - private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } + private val interferenceKey: VmInterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) } + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) } /** * The resource counters associated with the hypervisor. */ override val counters: SimHypervisorCounters get() = _counters - private val _counters = VmCountersImpl(cpus) + @JvmField val _counters = VmCountersImpl(cpus, interferenceDomain, interferenceKey) /** * The CPU capacity of the hypervisor in MHz. @@ -317,8 +321,8 @@ public abstract class SimAbstractHypervisor( override val cpuLostTime: Long get() = _cpuTime[3] - private val _cpuTime = LongArray(4) - private val _previous = DoubleArray(4) + val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) /** * Record the CPU time of the hypervisor. @@ -331,22 +335,18 @@ public abstract class SimAbstractHypervisor( val demand = counters.demand val actual = counters.actual val remaining = counters.remaining - val interference = counters.interference val demandDelta = demand - previous[0] val actualDelta = actual - previous[1] val remainingDelta = remaining - previous[2] - val interferenceDelta = interference - previous[3] previous[0] = demand previous[1] = actual previous[2] = remaining - previous[3] = interference cpuTime[0] += (actualDelta * d).roundToLong() cpuTime[1] += (remainingDelta * d).roundToLong() cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() - cpuTime[3] += (interferenceDelta * d).roundToLong() } override fun flush() { @@ -358,17 +358,71 @@ public abstract class SimAbstractHypervisor( /** * A [SimHypervisorCounters] implementation for a virtual machine. */ - private class VmCountersImpl(private val cpus: List) : SimHypervisorCounters { + private inner class VmCountersImpl( + private val cpus: List, + private val interferenceDomain: VmInterferenceDomain?, + private val key: VmInterferenceKey? + ) : SimHypervisorCounters { private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 override val cpuActiveTime: Long - get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() + get() = _cpuTime[0] override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong() + get() = _cpuTime[1] override val cpuStealTime: Long - get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() + get() = _cpuTime[2] override val cpuLostTime: Long - get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + get() = _cpuTime[3] + + private val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val cpuTime = _cpuTime + val previous = _previous + + var demand = 0.0 + var actual = 0.0 + var remaining = 0.0 + + for (cpu in cpus) { + val counters = cpu.counters + + actual += counters.actual + demand += counters.demand + remaining += counters.remaining + } + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + val d = d + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + + // Compute the performance penalty due to flow interference + val interferenceDomain = interferenceDomain + if (interferenceDomain != null) { + val mux = mux + val load = mux.rate / mux.capacity.coerceAtLeast(1.0) + val penalty = 1 - interferenceDomain.apply(key, load) + val interference = (actualDelta * d * penalty).roundToLong() + + if (interference > 0) { + cpuTime[3] += interference + _counters._cpuTime[3] += interference + } + } + } override fun flush() { for (cpu in cpus) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 36f76650..f6a700b9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -50,7 +50,7 @@ public class SimFairShareHypervisor( /** * The multiplexer that distributes the computing capacity. */ - override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this, interferenceDomain) + override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this) override fun canFit(model: MachineModel): Boolean = true } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index 09b03306..5220fa2d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -22,38 +22,45 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.flow.interference.InterferenceDomain -import org.opendc.simulator.flow.interference.InterferenceKey - /** * The interference domain of a hypervisor. */ -public interface VmInterferenceDomain : InterferenceDomain { +public interface VmInterferenceDomain { + /** + * Compute the performance score of a participant in this interference domain. + * + * @param key The participant to obtain the score of or `null` if the participant has no key. + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the resource consumer, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(key: VmInterferenceKey?, load: Double): Double + /** - * Construct an [InterferenceKey] for the specified [id]. + * Construct an [VmInterferenceKey] for the specified [id]. * * @param id The identifier of the virtual machine. * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine * does not participate in the domain. */ - public fun createKey(id: String): InterferenceKey? + public fun createKey(id: String): VmInterferenceKey? /** * Remove the specified [key] from this domain. */ - public fun removeKey(key: InterferenceKey) + public fun removeKey(key: VmInterferenceKey) /** * Mark the specified [key] as active in this interference domain. * * @param key The key to join the interference domain with. */ - public fun join(key: InterferenceKey) + public fun join(key: VmInterferenceKey) /** * Mark the specified [key] as inactive in this interference domain. * * @param key The key of the virtual machine that wants to leave the domain. */ - public fun leave(key: InterferenceKey) + public fun leave(key: VmInterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt new file mode 100644 index 00000000..8d720ea9 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +/** + * A key that uniquely identifies a participant of an interference domain. + */ +public interface VmInterferenceKey diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index 977292be..7cc545c8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.flow.interference.InterferenceKey import java.util.* /** @@ -215,12 +214,12 @@ public class VmInterferenceModel private constructor( */ private val activeKeys = ArrayList() - override fun createKey(id: String): InterferenceKey? { + override fun createKey(id: String): VmInterferenceKey? { val intId = idMapping[id] ?: return null return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } } - override fun removeKey(key: InterferenceKey) { + override fun removeKey(key: VmInterferenceKey) { if (key !is InterferenceKeyImpl) { return } @@ -232,7 +231,7 @@ public class VmInterferenceModel private constructor( keys.remove(key.id) } - override fun join(key: InterferenceKey) { + override fun join(key: VmInterferenceKey) { if (key !is InterferenceKeyImpl) { return } @@ -246,14 +245,14 @@ public class VmInterferenceModel private constructor( } } - override fun leave(key: InterferenceKey) { + override fun leave(key: VmInterferenceKey) { if (key is InterferenceKeyImpl && key.release()) { activeKeys.remove(key) computeActiveGroups(key.id) } } - override fun apply(key: InterferenceKey?, load: Double): Double { + override fun apply(key: VmInterferenceKey?, load: Double): Double { if (key == null || key !is InterferenceKeyImpl) { return 1.0 } @@ -367,7 +366,7 @@ public class VmInterferenceModel private constructor( * * @param id The identifier of the member. */ - private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable { + private class InterferenceKeyImpl(@JvmField val id: Int) : VmInterferenceKey, Comparable { /** * The active groups to which the key belongs. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 91855e8d..dd5bfc33 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -47,7 +47,6 @@ import org.opendc.simulator.flow.FlowEngine /** * Test suite for the [SimHypervisor] class. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimFairShareHypervisorTest { private lateinit var model: MachineModel diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index a717ae6e..d8ad7978 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -41,11 +41,6 @@ public interface FlowCounters { */ public val remaining: Double - /** - * The accumulated flow lost due to interference between sources. - */ - public val interference: Double - /** * Reset the flow counters. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 0ad18f6a..6fa2971a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -258,6 +258,6 @@ public class FlowForwarder( val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.increment(work, actualWork, (total - actualWork), 0.0) + counters.increment(work, actualWork, (total - actualWork)) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index d0324ce8..ee8cd739 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -154,7 +154,7 @@ public class FlowSink( val work = capacity * deltaS val actualWork = ctx.rate * deltaS - counters.increment(work, actualWork, (total - actualWork), 0.0) + counters.increment(work, actualWork, (total - actualWork)) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt deleted file mode 100644 index aa2713b6..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt +++ /dev/null @@ -1,19 +0,0 @@ -package org.opendc.simulator.flow.interference - -import org.opendc.simulator.flow.FlowSource - -/** - * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur - * performance variability due to operating on the same resource and therefore causing interference. - */ -public interface InterferenceDomain { - /** - * Compute the performance score of a participant in this interference domain. - * - * @param key The participant to obtain the score of or `null` if the participant has no key. - * @param load The overall load on the interference domain. - * @return A score representing the performance score to be applied to the resource consumer, with 1 - * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. - */ - public fun apply(key: InterferenceKey?, load: Double): Double -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt deleted file mode 100644 index d28ebde5..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.interference - -/** - * A key that uniquely identifies a participant of an interference domain. - */ -public interface InterferenceKey diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt index d990dc61..c320a362 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -34,23 +34,20 @@ public class MutableFlowCounters : FlowCounters { get() = _counters[1] override val remaining: Double get() = _counters[2] - override val interference: Double - get() = _counters[3] - private val _counters = DoubleArray(4) + private val _counters = DoubleArray(3) override fun reset() { _counters.fill(0.0) } - public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + public fun increment(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters[0] += demand counters[1] += actual counters[2] += remaining - counters[3] += interference } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 5f198944..3b4583e2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.FlowConsumer import org.opendc.simulator.flow.FlowCounters import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.interference.InterferenceKey /** * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. @@ -63,18 +62,15 @@ public interface FlowMultiplexer { /** * Create a new input on this multiplexer with a coupled capacity. - * - * @param key The key of the interference member to which the input belongs. */ - public fun newInput(key: InterferenceKey? = null): FlowConsumer + public fun newInput(): FlowConsumer /** * Create a new input on this multiplexer with the specified [capacity]. * * @param capacity The capacity of the input. - * @param key The key of the interference member to which the input belongs. */ - public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer + public fun newInput(capacity: Double): FlowConsumer /** * Remove [input] from this multiplexer. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 1d7d22ef..c18b2701 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque /** @@ -54,8 +53,6 @@ public class ForwardingFlowMultiplexer( get() = _outputs.sumOf { it.forwarder.counters.actual } override val remaining: Double get() = _outputs.sumOf { it.forwarder.counters.remaining } - override val interference: Double - get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { for (output in _outputs) { @@ -75,14 +72,14 @@ public class ForwardingFlowMultiplexer( override val capacity: Double get() = _outputs.sumOf { it.forwarder.capacity } - override fun newInput(key: InterferenceKey?): FlowConsumer { + override fun newInput(): FlowConsumer { val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } val input = Input(output) _inputs += input return input } - override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key) + override fun newInput(capacity: Double): FlowConsumer = newInput() override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index cc831862..7605f67d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -23,11 +23,8 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceDomain -import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.max import kotlin.math.min /** @@ -35,12 +32,10 @@ import kotlin.math.min * * @param engine The [FlowEngine] to drive the flow simulation. * @param parent The parent flow system of the multiplexer. - * @param interferenceDomain The interference domain of the multiplexer. */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - parent: FlowConvergenceListener? = null, - private val interferenceDomain: InterferenceDomain? = null + parent: FlowConvergenceListener? = null ) : FlowMultiplexer { /** * The inputs of the multiplexer. @@ -85,16 +80,16 @@ public class MaxMinFlowMultiplexer( */ private val scheduler = Scheduler(engine, parent) - override fun newInput(key: InterferenceKey?): FlowConsumer { - return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key) + override fun newInput(): FlowConsumer { + return newInput(isCoupled = true, Double.POSITIVE_INFINITY) } - override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer { - return newInput(isCoupled = false, capacity, key) + override fun newInput(capacity: Double): FlowConsumer { + return newInput(isCoupled = false, capacity) } - private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity) + private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer { + val provider = Input(engine, scheduler, isCoupled, initialCapacity) _inputs.add(provider) return provider } @@ -499,8 +494,7 @@ public class MaxMinFlowMultiplexer( counters.increment( demand = demand * deltaS, actual = rate * deltaS, - remaining = (previousCapacity - rate) * deltaS, - interference = 0.0 + remaining = (previousCapacity - rate) * deltaS ) } } @@ -511,8 +505,6 @@ public class MaxMinFlowMultiplexer( private class Input( private val engine: FlowEngine, private val scheduler: Scheduler, - private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey?, @JvmField val isCoupled: Boolean, initialCapacity: Double, ) : FlowConsumer, FlowConsumerLogic, Comparable { @@ -693,24 +685,15 @@ public class MaxMinFlowMultiplexer( return } - // Compute the performance penalty due to flow interference - val perfScore = if (interferenceDomain != null) { - val load = scheduler.rate / scheduler.capacity - interferenceDomain.apply(key, load) - } else { - 1.0 - } - val actualRate = actualRate val deltaS = delta * D_MS_TO_S val demand = limit * deltaS val actual = actualRate * deltaS val remaining = (_capacity - actualRate) * deltaS - val interference = actual * max(0.0, 1 - perfScore) - _counters.increment(demand, actual, remaining, interference) - scheduler.counters.increment(0.0, 0.0, 0.0, interference) + _counters.increment(demand, actual, remaining) + scheduler.counters.increment(0.0, 0.0, 0.0) } } -- cgit v1.2.3 From c9d4b846dfa7bf71bc394761717cb165951fa790 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 2 Sep 2022 22:10:21 +0200 Subject: refactor(sim/compute): Remove convergence listener parameter This change removes the convergence listener parameter in for the `SimBareMetalMachine` and the hypervisors. This parameter was not used in the code-base and is being removed with the introduction of the new flow2 module. --- .../org/opendc/simulator/compute/SimMachineBenchmarks.kt | 7 +++---- .../org/opendc/simulator/compute/SimAbstractMachine.kt | 6 +----- .../org/opendc/simulator/compute/SimBareMetalMachine.kt | 4 +--- .../opendc/simulator/compute/kernel/SimAbstractHypervisor.kt | 7 ++----- .../simulator/compute/kernel/SimFairShareHypervisor.kt | 4 +--- .../compute/kernel/SimFairShareHypervisorProvider.kt | 4 +--- .../opendc/simulator/compute/kernel/SimHypervisorProvider.kt | 2 -- .../simulator/compute/kernel/SimSpaceSharedHypervisor.kt | 4 +--- .../compute/kernel/SimSpaceSharedHypervisorProvider.kt | 4 +--- .../simulator/compute/kernel/SimFairShareHypervisorTest.kt | 8 ++++---- .../simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt | 12 ++++++------ 11 files changed, 21 insertions(+), 41 deletions(-) diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 91e91f9d..f257ebb3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit @Fork(1) @Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var machineModel: MachineModel private lateinit var trace: SimTrace @@ -87,7 +86,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimSpaceSharedHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } @@ -109,7 +108,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine, null, null, null) + val hypervisor = SimFairShareHypervisor(engine, null, null) launch { machine.runWorkload(hypervisor) } @@ -131,7 +130,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine, null, null, null) + val hypervisor = SimFairShareHypervisor(engine, null, null) launch { machine.runWorkload(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index e14ea507..7f6766ee 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -36,12 +36,10 @@ import org.opendc.simulator.flow.* * Abstract implementation of the [SimMachine] interface. * * @param engine The engine to manage the machine's resources. - * @param parent The parent simulation system. * @param model The model of the machine. */ public abstract class SimAbstractMachine( protected val engine: FlowEngine, - private val parent: FlowConvergenceListener?, final override val model: MachineModel ) : SimMachine, FlowConvergenceListener { /** @@ -86,9 +84,7 @@ public abstract class SimAbstractMachine( _ctx?.close() } - override fun onConverge(now: Long) { - parent?.onConverge(now) - } + override fun onConverge(now: Long) {} /** * The execution context in which the workload runs. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 63a82e78..0df897b6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -40,15 +40,13 @@ import kotlin.math.max * @param model The machine model to simulate. * @param powerDriver The power driver to use. * @param psu The power supply of the machine. - * @param parent The parent simulation system. */ public class SimBareMetalMachine( engine: FlowEngine, model: MachineModel, powerDriver: PowerDriver, public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)), - parent: FlowConvergenceListener? = null, -) : SimAbstractMachine(engine, parent, model) { +) : SimAbstractMachine(engine, model) { /** * The current power usage of the machine (without PSU loss) in W. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 9495095e..b98647e7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -26,11 +26,11 @@ import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.kernel.interference.VmInterferenceKey import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import org.opendc.simulator.compute.kernel.interference.VmInterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -42,7 +42,6 @@ import kotlin.math.roundToLong */ public abstract class SimAbstractHypervisor( protected val engine: FlowEngine, - private val listener: FlowConvergenceListener?, private val scalingGovernor: ScalingGovernor?, protected val interferenceDomain: VmInterferenceDomain? = null ) : SimHypervisor, FlowConvergenceListener { @@ -154,8 +153,6 @@ public abstract class SimAbstractHypervisor( for (governor in governors) { governor.onLimit(load) } - - listener?.onConverge(now) } /** @@ -166,7 +163,7 @@ public abstract class SimAbstractHypervisor( private inner class VirtualMachine( model: MachineModel, interferenceId: String? = null - ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable { + ) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { /** * A flag to indicate that the machine is closed. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index f6a700b9..a12c5517 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -27,7 +27,6 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.flow.mux.FlowMultiplexer import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer @@ -43,10 +42,9 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer */ public class SimFairShareHypervisor( engine: FlowEngine, - listener: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, -) : SimAbstractHypervisor(engine, listener, scalingGovernor, interferenceDomain) { +) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) { /** * The multiplexer that distributes the computing capacity. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index 3136f4c8..204b4860 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine /** @@ -35,8 +34,7 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, - listener: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, - ): SimHypervisor = SimFairShareHypervisor(engine, listener, scalingGovernor, interferenceDomain) + ): SimHypervisor = SimFairShareHypervisor(engine, scalingGovernor, interferenceDomain) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index 483217af..b7e8760a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine /** @@ -44,7 +43,6 @@ public interface SimHypervisorProvider { */ public fun create( engine: FlowEngine, - listener: FlowConvergenceListener? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, ): SimHypervisor diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index 3f3bf6ad..7976077c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.flow.mux.FlowMultiplexer import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer @@ -34,9 +33,8 @@ import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer */ public class SimSpaceSharedHypervisor( engine: FlowEngine, - listener: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, -) : SimAbstractHypervisor(engine, listener, scalingGovernor) { +) : SimAbstractHypervisor(engine, scalingGovernor) { override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this) override fun canFit(model: MachineModel): Boolean { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index dd6fb0b1..96b73e69 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine /** @@ -35,8 +34,7 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, - listener: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, - ): SimHypervisor = SimSpaceSharedHypervisor(engine, listener, scalingGovernor) + ): SimHypervisor = SimSpaceSharedHypervisor(engine, scalingGovernor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index dd5bfc33..5f3c3b17 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -77,7 +77,7 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null) + val hypervisor = SimFairShareHypervisor(platform, PerformanceScalingGovernor(), null) launch { machine.runWorkload(hypervisor) @@ -128,7 +128,7 @@ internal class SimFairShareHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null, null, null) + val hypervisor = SimFairShareHypervisor(platform, null, null) launch { machine.runWorkload(hypervisor) @@ -167,7 +167,7 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, null, null, null) + val hypervisor = SimFairShareHypervisor(platform, null, null) assertDoesNotThrow { launch { @@ -196,7 +196,7 @@ internal class SimFairShareHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null, null, interferenceModel.newDomain()) + val hypervisor = SimFairShareHypervisor(platform, null, interferenceModel.newDomain()) val duration = 5 * 60L val workloadA = diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 823a0ae3..0f533130 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -75,7 +75,7 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimSpaceSharedHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } val vm = hypervisor.newMachine(machineModel) @@ -97,7 +97,7 @@ internal class SimSpaceSharedHypervisorTest { val workload = SimRuntimeWorkload(duration) val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimSpaceSharedHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } yield() @@ -121,7 +121,7 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimSpaceSharedHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } yield() @@ -142,7 +142,7 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimSpaceSharedHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } yield() @@ -169,7 +169,7 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimSpaceSharedHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } yield() @@ -193,7 +193,7 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null) + val hypervisor = SimSpaceSharedHypervisor(interpreter, null) launch { machine.runWorkload(hypervisor) } yield() -- cgit v1.2.3 From d65978b5f3426280d1dd64105e8442f5d5d98b9e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 2 Sep 2022 22:12:41 +0200 Subject: refactor(sim/compute): Remove FlowEngine from SimMachineContext This change removes the reference to the active FlowEngine from the SimMachineContext interface. This prevents leaking the lower layer into this module. --- .../main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt | 2 +- .../main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 7f6766ee..ef0cd323 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -101,7 +101,7 @@ public abstract class SimAbstractMachine( */ private var isClosed = false - override val engine: FlowEngine = this@SimAbstractMachine.engine + val engine: FlowEngine = this@SimAbstractMachine.engine /** * Start this context. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index 1317f728..5e3a7766 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,19 +22,12 @@ package org.opendc.simulator.compute -import org.opendc.simulator.flow.FlowEngine - /** * A simulated execution context in which a bootable image runs. This interface represents the * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on * which the image runs. */ public interface SimMachineContext : AutoCloseable { - /** - * The [FlowEngine] that simulates the machine. - */ - public val engine: FlowEngine - /** * The metadata associated with the context. */ -- cgit v1.2.3 From a7510e0708b6e5435f8440e588c762d6e6cd8a22 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 2 Sep 2022 22:18:12 +0200 Subject: refactor(sim/compute): Remove timestamp parameter from SimTrace This change removes the timestamp parameter from `SimTrace`. Instead, it is now assumed that the trace is continuous and the end of a fragment starts a new fragment, in order to simplify replaying of the trace. --- .../compute/workload/ComputeWorkloadLoader.kt | 14 +++++- .../simulator/compute/SimMachineBenchmarks.kt | 3 +- .../opendc/simulator/compute/workload/SimTrace.kt | 53 +++------------------- 3 files changed, 21 insertions(+), 49 deletions(-) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 12c2325a..7ed04994 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -222,6 +222,11 @@ public class ComputeWorkloadLoader(private val baseDir: File) { */ private val builder = SimTrace.builder() + /** + * The deadline of the previous fragment. + */ + private var previousDeadline = Long.MIN_VALUE + /** * Add a fragment to the trace. * @@ -233,7 +238,14 @@ public class ComputeWorkloadLoader(private val baseDir: File) { fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { val duration = max(0, deadline - timestamp) totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs - builder.add(timestamp, deadline, usage, cores) + + if (timestamp != previousDeadline) { + // There is a gap between the previous and current fragment; fill the gap + builder.add(timestamp, 0.0, cores) + } + + builder.add(deadline, usage, cores) + previousDeadline = deadline } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index f257ebb3..fbcc99c3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import org.opendc.simulator.compute.kernel.SimFairShareHypervisor @@ -63,7 +62,7 @@ class SimMachineBenchmarks { repeat(10000) { val timestamp = it.toLong() val deadline = timestamp + 1000 - builder.add(timestamp, deadline, random.nextDouble(0.0, 4500.0), 1) + builder.add(deadline, random.nextDouble(0.0, 4500.0), 1) } trace = builder.build() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt index 6b820e5d..e66227b5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -31,14 +31,12 @@ import kotlin.math.min * A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s. * * @param usageCol The column containing the CPU usage of each fragment (in MHz). - * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis). * @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis). * @param coresCol The column containing the utilized cores. * @param size The number of fragments in the trace. */ public class SimTrace( private val usageCol: DoubleArray, - private val timestampCol: LongArray, private val deadlineCol: LongArray, private val coresCol: IntArray, private val size: Int, @@ -46,7 +44,6 @@ public class SimTrace( init { require(size >= 0) { "Invalid trace size" } require(usageCol.size >= size) { "Invalid number of usage entries" } - require(timestampCol.size >= size) { "Invalid number of timestamp entries" } require(deadlineCol.size >= size) { "Invalid number of deadline entries" } require(coresCol.size >= size) { "Invalid number of core entries" } } @@ -59,19 +56,17 @@ public class SimTrace( public fun ofFragments(fragments: List): SimTrace { val size = fragments.size val usageCol = DoubleArray(size) - val timestampCol = LongArray(size) val deadlineCol = LongArray(size) val coresCol = IntArray(size) for (i in fragments.indices) { val fragment = fragments[i] usageCol[i] = fragment.usage - timestampCol[i] = fragment.timestamp deadlineCol[i] = fragment.timestamp + fragment.duration coresCol[i] = fragment.cores } - return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + return SimTrace(usageCol, deadlineCol, coresCol, size) } /** @@ -81,19 +76,17 @@ public class SimTrace( public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace { val size = fragments.size val usageCol = DoubleArray(size) - val timestampCol = LongArray(size) val deadlineCol = LongArray(size) val coresCol = IntArray(size) for (i in fragments.indices) { val fragment = fragments[i] usageCol[i] = fragment.usage - timestampCol[i] = fragment.timestamp deadlineCol[i] = fragment.timestamp + fragment.duration coresCol[i] = fragment.cores } - return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + return SimTrace(usageCol, deadlineCol, coresCol, size) } /** @@ -108,25 +101,9 @@ public class SimTrace( * * @param cpu The [ProcessingUnit] for which to create the source. * @param offset The time offset to use for the trace. - * @param fillMode The [FillMode] for filling missing data. */ - public fun newSource(cpu: ProcessingUnit, offset: Long, fillMode: FillMode = FillMode.None): FlowSource { - return CpuConsumer(cpu, offset, fillMode, usageCol, timestampCol, deadlineCol, coresCol, size) - } - - /** - * An enumeration describing the modes for filling missing data. - */ - public enum class FillMode { - /** - * When a gap in the trace data occurs, the CPU usage will be set to zero. - */ - None, - - /** - * When a gap in the trace data occurs, the previous CPU usage will be used. - */ - Previous + public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource { + return CpuConsumer(cpu, offset, usageCol, deadlineCol, coresCol, size) } /** @@ -137,7 +114,6 @@ public class SimTrace( * The columns of the trace. */ private var usageCol: DoubleArray = DoubleArray(16) - private var timestampCol: LongArray = LongArray(16) private var deadlineCol: LongArray = LongArray(16) private var coresCol: IntArray = IntArray(16) @@ -150,25 +126,23 @@ public class SimTrace( * Add the specified [SimTraceFragment] to the trace. */ public fun add(fragment: SimTraceFragment) { - add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores) + add(fragment.timestamp + fragment.duration, fragment.usage, fragment.cores) } /** * Add a fragment to the trace. * - * @param timestamp Timestamp at which the fragment starts (in epoch millis). * @param deadline Timestamp at which the fragment ends (in epoch millis). * @param usage CPU usage of this fragment. * @param cores Number of cores used. */ - public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + public fun add(deadline: Long, usage: Double, cores: Int) { val size = size if (size == usageCol.size) { grow() } - timestampCol[size] = timestamp deadlineCol[size] = deadline usageCol[size] = usage coresCol[size] = cores @@ -184,7 +158,6 @@ public class SimTrace( val newSize = arraySize + (arraySize shr 1) usageCol = usageCol.copyOf(newSize) - timestampCol = timestampCol.copyOf(newSize) deadlineCol = deadlineCol.copyOf(newSize) coresCol = coresCol.copyOf(newSize) } @@ -193,7 +166,7 @@ public class SimTrace( * Construct the immutable [SimTrace]. */ public fun build(): SimTrace { - return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + return SimTrace(usageCol, deadlineCol, coresCol, size) } } @@ -203,9 +176,7 @@ public class SimTrace( private class CpuConsumer( cpu: ProcessingUnit, private val offset: Long, - private val fillMode: FillMode, private val usageCol: DoubleArray, - private val timestampCol: LongArray, private val deadlineCol: LongArray, private val coresCol: IntArray, private val size: Int @@ -236,16 +207,6 @@ public class SimTrace( } _idx = idx - val timestamp = timestampCol[idx] - - // There is a gap in the trace, since the next fragment starts in the future. - if (timestamp > nowOffset) { - when (fillMode) { - FillMode.None -> conn.push(0.0) // Reset rate to zero - FillMode.Previous -> {} // Keep previous rate - } - return timestamp - nowOffset - } val cores = min(coreCount, coresCol[idx]) val usage = usageCol[idx] -- cgit v1.2.3 From 2ec5e8f1e44239916779655d4d68a9c6dae8e894 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 5 Sep 2022 19:20:14 +0200 Subject: refactor(sim/compute): Pass interference key via parameter This change updates the signature of the `SimHypervisor` interface to accept a `VmInterferenceKey` when creating a new virtual machine, instead of providing a string identifier. This is in preparation for removing the dependency on the `VmInterferenceModel` in the `SimAbstractHypervisor` class. --- .../src/main/kotlin/org/opendc/compute/simulator/SimHost.kt | 5 +++-- .../opendc/simulator/compute/kernel/SimAbstractHypervisor.kt | 12 ++++-------- .../org/opendc/simulator/compute/kernel/SimHypervisor.kt | 5 +++-- .../simulator/compute/kernel/SimFairShareHypervisorTest.kt | 7 ++++--- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index c28239b4..908353f0 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -65,7 +65,7 @@ public class SimHost( scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - interferenceDomain: VmInterferenceDomain? = null, + private val interferenceDomain: VmInterferenceDomain? = null, private val optimize: Boolean = false ) : Host, AutoCloseable { /** @@ -144,7 +144,8 @@ public class SimHost( val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - val machine = hypervisor.newMachine(key.flavor.toMachineModel(), key.name) + val interferenceKey = interferenceDomain?.createKey(key.name) + val machine = hypervisor.newMachine(key.flavor.toMachineModel(), interferenceKey) val newGuest = Guest( scope.coroutineContext, clock, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index b98647e7..b3898004 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -93,9 +93,9 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf() /* SimHypervisor */ - override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { + override fun newMachine(model: MachineModel, interferenceKey: VmInterferenceKey?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } - val vm = VirtualMachine(model, interferenceId) + val vm = VirtualMachine(model, interferenceKey) _vms.add(vm) return vm } @@ -159,21 +159,17 @@ public abstract class SimAbstractHypervisor( * A virtual machine running on the hypervisor. * * @param model The machine model of the virtual machine. + * @param interferenceKey The interference key of this virtual machine. */ private inner class VirtualMachine( model: MachineModel, - interferenceId: String? = null + private val interferenceKey: VmInterferenceKey? = null ) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { /** * A flag to indicate that the machine is closed. */ private var isClosed = false - /** - * The interference key of this virtual machine. - */ - private val interferenceKey: VmInterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } - /** * The vCPUs of the machine. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index a69f419f..229e569c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.kernel.interference.VmInterferenceKey import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload @@ -65,9 +66,9 @@ public interface SimHypervisor : SimWorkload { * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. - * @param interferenceId An identifier for the interference model. + * @param interferenceKey The key of the machine in the interference model. */ - public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + public fun newMachine(model: MachineModel, interferenceKey: VmInterferenceKey? = null): SimVirtualMachine /** * Remove the specified [machine] from the hypervisor. diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 5f3c3b17..ab2a6d76 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -191,12 +191,13 @@ internal class SimFairShareHypervisorTest { .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")) .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) .build() + val interferenceDomain = interferenceModel.newDomain() val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null, interferenceModel.newDomain()) + val hypervisor = SimFairShareHypervisor(platform, null, interferenceDomain) val duration = 5 * 60L val workloadA = @@ -224,11 +225,11 @@ internal class SimFairShareHypervisorTest { coroutineScope { launch { - val vm = hypervisor.newMachine(model, "a") + val vm = hypervisor.newMachine(model, interferenceDomain.createKey("a")) vm.runWorkload(workloadA) hypervisor.removeMachine(vm) } - val vm = hypervisor.newMachine(model, "b") + val vm = hypervisor.newMachine(model, interferenceDomain.createKey("b")) vm.runWorkload(workloadB) hypervisor.removeMachine(vm) } -- cgit v1.2.3 From 35ec0060fb73149e687655851a682f12486f0086 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 5 Sep 2022 20:23:45 +0200 Subject: refactor(sim/compute): Move interference logic into VmInterferenceMember This change updates the design of the VM interference model, where we move more of the logic into the `VmInterferenceMember` interface. This removes the dependency on the `VmInterferenceModel` for the hypervisor interface. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 4 +- .../simulator/compute/SimMachineBenchmarks.kt | 4 +- .../compute/kernel/SimAbstractHypervisor.kt | 48 ++--- .../compute/kernel/SimFairShareHypervisor.kt | 8 +- .../kernel/SimFairShareHypervisorProvider.kt | 4 +- .../simulator/compute/kernel/SimHypervisor.kt | 4 +- .../compute/kernel/SimHypervisorProvider.kt | 7 +- .../kernel/SimSpaceSharedHypervisorProvider.kt | 2 - .../kernel/interference/VmInterferenceDomain.kt | 37 +--- .../kernel/interference/VmInterferenceKey.kt | 28 --- .../kernel/interference/VmInterferenceMember.kt | 47 +++++ .../kernel/interference/VmInterferenceModel.kt | 197 ++++++++++----------- .../compute/kernel/SimFairShareHypervisorTest.kt | 12 +- 13 files changed, 170 insertions(+), 232 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 908353f0..628f324b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -92,7 +92,7 @@ public class SimHost( * The hypervisor to run multiple workloads. */ private val hypervisor: SimHypervisor = hypervisorProvider - .create(engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain) + .create(engine, scalingGovernor = scalingGovernor) /** * The virtual machines running on the hypervisor. @@ -144,7 +144,7 @@ public class SimHost( val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - val interferenceKey = interferenceDomain?.createKey(key.name) + val interferenceKey = interferenceDomain?.getMember(key.name) val machine = hypervisor.newMachine(key.flavor.toMachineModel(), interferenceKey) val newGuest = Guest( scope.coroutineContext, diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index fbcc99c3..02b48fa7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -107,7 +107,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine, null, null) + val hypervisor = SimFairShareHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } @@ -129,7 +129,7 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine, null, null) + val hypervisor = SimFairShareHypervisor(engine, null) launch { machine.runWorkload(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index b3898004..98dab28f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -25,8 +25,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.kernel.interference.VmInterferenceKey +import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload @@ -42,8 +41,7 @@ import kotlin.math.roundToLong */ public abstract class SimAbstractHypervisor( protected val engine: FlowEngine, - private val scalingGovernor: ScalingGovernor?, - protected val interferenceDomain: VmInterferenceDomain? = null + private val scalingGovernor: ScalingGovernor? ) : SimHypervisor, FlowConvergenceListener { /** * The machine on which the hypervisor runs. @@ -93,7 +91,7 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf() /* SimHypervisor */ - override fun newMachine(model: MachineModel, interferenceKey: VmInterferenceKey?): SimVirtualMachine { + override fun newMachine(model: MachineModel, interferenceKey: VmInterferenceMember?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } val vm = VirtualMachine(model, interferenceKey) _vms.add(vm) @@ -163,7 +161,7 @@ public abstract class SimAbstractHypervisor( */ private inner class VirtualMachine( model: MachineModel, - private val interferenceKey: VmInterferenceKey? = null + private val interferenceKey: VmInterferenceMember? = null ) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { /** * A flag to indicate that the machine is closed. @@ -180,7 +178,7 @@ public abstract class SimAbstractHypervisor( */ override val counters: SimHypervisorCounters get() = _counters - @JvmField val _counters = VmCountersImpl(cpus, interferenceDomain, interferenceKey) + @JvmField val _counters = VmCountersImpl(cpus, interferenceKey) /** * The CPU capacity of the hypervisor in MHz. @@ -206,17 +204,18 @@ public abstract class SimAbstractHypervisor( return super.startWorkload( object : SimWorkload { override fun onStart(ctx: SimMachineContext) { + val interferenceKey = interferenceKey try { - joinInterferenceDomain() + interferenceKey?.activate() workload.onStart(ctx) } catch (cause: Throwable) { - leaveInterferenceDomain() + interferenceKey?.deactivate() throw cause } } override fun onStop(ctx: SimMachineContext) { - leaveInterferenceDomain() + interferenceKey?.deactivate() workload.onStop(ctx) } }, @@ -236,26 +235,6 @@ public abstract class SimAbstractHypervisor( cpu.close() } } - - /** - * Join the interference domain of the hypervisor. - */ - private fun joinInterferenceDomain() { - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.join(interferenceKey) - } - } - - /** - * Leave the interference domain of the hypervisor. - */ - private fun leaveInterferenceDomain() { - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) - } - } } /** @@ -353,8 +332,7 @@ public abstract class SimAbstractHypervisor( */ private inner class VmCountersImpl( private val cpus: List, - private val interferenceDomain: VmInterferenceDomain?, - private val key: VmInterferenceKey? + private val key: VmInterferenceMember? ) : SimHypervisorCounters { private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 @@ -403,11 +381,11 @@ public abstract class SimAbstractHypervisor( cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() // Compute the performance penalty due to flow interference - val interferenceDomain = interferenceDomain - if (interferenceDomain != null) { + val key = key + if (key != null) { val mux = mux val load = mux.rate / mux.capacity.coerceAtLeast(1.0) - val penalty = 1 - interferenceDomain.apply(key, load) + val penalty = 1 - key.apply(load) val interference = (actualDelta * d * penalty).roundToLong() if (interference > 0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index a12c5517..66453835 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine @@ -36,15 +35,12 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer * concurrently using weighted fair sharing. * * @param engine The [FlowEngine] to manage the machine's resources. - * @param listener The listener for the convergence of the system. * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. - * @param interferenceDomain The resource interference domain to which the hypervisor belongs. */ public class SimFairShareHypervisor( engine: FlowEngine, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain?, -) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) { + scalingGovernor: ScalingGovernor? +) : SimAbstractHypervisor(engine, scalingGovernor) { /** * The multiplexer that distributes the computing capacity. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index 204b4860..ad8177d3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.flow.FlowEngine /** @@ -35,6 +34,5 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain?, - ): SimHypervisor = SimFairShareHypervisor(engine, scalingGovernor, interferenceDomain) + ): SimHypervisor = SimFairShareHypervisor(engine, scalingGovernor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 229e569c..f53d0c5d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.kernel.interference.VmInterferenceKey +import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload @@ -68,7 +68,7 @@ public interface SimHypervisor : SimWorkload { * @param model The machine to create. * @param interferenceKey The key of the machine in the interference model. */ - public fun newMachine(model: MachineModel, interferenceKey: VmInterferenceKey? = null): SimVirtualMachine + public fun newMachine(model: MachineModel, interferenceKey: VmInterferenceMember? = null): SimVirtualMachine /** * Remove the specified [machine] from the hypervisor. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index b7e8760a..6ee523fd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.flow.FlowEngine /** @@ -41,9 +40,5 @@ public interface SimHypervisorProvider { /** * Create a new [SimHypervisor] instance. */ - public fun create( - engine: FlowEngine, - scalingGovernor: ScalingGovernor? = null, - interferenceDomain: VmInterferenceDomain? = null, - ): SimHypervisor + public fun create(engine: FlowEngine, scalingGovernor: ScalingGovernor? = null): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index 96b73e69..f7456797 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.flow.FlowEngine /** @@ -35,6 +34,5 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain?, ): SimHypervisor = SimSpaceSharedHypervisor(engine, scalingGovernor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index 5220fa2d..3b355f1e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -27,40 +27,11 @@ package org.opendc.simulator.compute.kernel.interference */ public interface VmInterferenceDomain { /** - * Compute the performance score of a participant in this interference domain. - * - * @param key The participant to obtain the score of or `null` if the participant has no key. - * @param load The overall load on the interference domain. - * @return A score representing the performance score to be applied to the resource consumer, with 1 - * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. - */ - public fun apply(key: VmInterferenceKey?, load: Double): Double - - /** - * Construct an [VmInterferenceKey] for the specified [id]. + * Return the [VmInterferenceMember] associated with the specified [id]. * * @param id The identifier of the virtual machine. - * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine - * does not participate in the domain. - */ - public fun createKey(id: String): VmInterferenceKey? - - /** - * Remove the specified [key] from this domain. - */ - public fun removeKey(key: VmInterferenceKey) - - /** - * Mark the specified [key] as active in this interference domain. - * - * @param key The key to join the interference domain with. - */ - public fun join(key: VmInterferenceKey) - - /** - * Mark the specified [key] as inactive in this interference domain. - * - * @param key The key of the virtual machine that wants to leave the domain. + * @return A [VmInterferenceMember] representing the virtual machine as part of the interference domain. `null` if + * the virtual machine does not participate in the domain. */ - public fun leave(key: VmInterferenceKey) + public fun getMember(id: String): VmInterferenceMember? } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt deleted file mode 100644 index 8d720ea9..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel.interference - -/** - * A key that uniquely identifies a participant of an interference domain. - */ -public interface VmInterferenceKey diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt new file mode 100644 index 00000000..2f3dd74b --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +/** + * A participant of an interference domain. + */ +public interface VmInterferenceMember { + /** + * Mark this member as active in this interference domain. + */ + public fun activate() + + /** + * Mark this member as inactive in this interference domain. + */ + public fun deactivate() + + /** + * Compute the performance score of the member in this interference domain. + * + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the member, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(load: Double): Double +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index 7cc545c8..bfda3121 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -35,11 +35,11 @@ import java.util.* * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ public class VmInterferenceModel private constructor( - private val targets: DoubleArray, - private val scores: DoubleArray, private val idMapping: Map, private val members: Array, private val membership: Array, + private val targets: DoubleArray, + private val scores: DoubleArray, private val size: Int, seed: Long, ) { @@ -51,13 +51,13 @@ public class VmInterferenceModel private constructor( /** * Construct a new [VmInterferenceDomain]. */ - public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random) + public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(idMapping, members, membership, targets, scores, random) /** * Create a copy of this model with a different seed. */ public fun withSeed(seed: Long): VmInterferenceModel { - return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed) + return VmInterferenceModel(idMapping, members, membership, targets, scores, size, seed) } public companion object { @@ -171,11 +171,11 @@ public class VmInterferenceModel private constructor( @Suppress("UNCHECKED_CAST") return VmInterferenceModel( - newTargets, - newScores, idMapping, newMembers as Array, membership.map { it.value.toIntArray() }.toTypedArray(), + newTargets, + newScores, size, seed ) @@ -197,127 +197,62 @@ public class VmInterferenceModel private constructor( * Internal implementation of [VmInterferenceDomain]. */ private class InterferenceDomainImpl( - private val targets: DoubleArray, - private val scores: DoubleArray, private val idMapping: Map, private val members: Array, private val membership: Array, + private val targets: DoubleArray, + private val scores: DoubleArray, private val random: SplittableRandom ) : VmInterferenceDomain { /** * Keys registered with this domain. */ - private val keys = HashMap() + private val keys = HashMap() /** * The set of keys active in this domain. */ - private val activeKeys = ArrayList() - - override fun createKey(id: String): VmInterferenceKey? { - val intId = idMapping[id] ?: return null - return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } - } - - override fun removeKey(key: VmInterferenceKey) { - if (key !is InterferenceKeyImpl) { - return - } + private val activeKeys = ArrayList() - if (activeKeys.remove(key)) { - computeActiveGroups(key.id) - } + /** + * Queue of participants that will be removed or added to the active groups. + */ + private val participants = ArrayDeque() - keys.remove(key.id) + override fun getMember(id: String): VmInterferenceMember? { + val intId = idMapping[id] ?: return null + return keys.computeIfAbsent(intId) { InterferenceMemberImpl(it, this, members, targets, scores, random) } } - override fun join(key: VmInterferenceKey) { - if (key !is InterferenceKeyImpl) { - return - } + override fun toString(): String = "VmInterferenceDomain" - if (key.acquire()) { - val pos = activeKeys.binarySearch(key) - if (pos < 0) { - activeKeys.add(-pos - 1, key) - } - computeActiveGroups(key.id) + fun join(key: InterferenceMemberImpl) { + val activeKeys = activeKeys + val pos = activeKeys.binarySearch(key) + if (pos < 0) { + activeKeys.add(-pos - 1, key) } - } - override fun leave(key: VmInterferenceKey) { - if (key is InterferenceKeyImpl && key.release()) { - activeKeys.remove(key) - computeActiveGroups(key.id) - } + computeActiveGroups(activeKeys, key.id) } - override fun apply(key: VmInterferenceKey?, load: Double): Double { - if (key == null || key !is InterferenceKeyImpl) { - return 1.0 - } - - val groups = key.groups - val groupSize = groups.size - - if (groupSize == 0) { - return 1.0 - } - - val targets = targets - val scores = scores - var low = 0 - var high = groups.size - 1 - - var group = -1 - var score = 1.0 - - // Perform binary search over the groups based on target load - while (low <= high) { - val mid = low + high ushr 1 - val midGroup = groups[mid] - val target = targets[midGroup] - - if (target < load) { - low = mid + 1 - group = midGroup - score = scores[midGroup] - } else if (target > load) { - high = mid - 1 - } else { - group = midGroup - score = scores[midGroup] - break - } - } - - return if (group >= 0 && random.nextInt(members[group].size) == 0) { - score - } else { - 1.0 - } + fun leave(key: InterferenceMemberImpl) { + val activeKeys = activeKeys + activeKeys.remove(key) + computeActiveGroups(activeKeys, key.id) } - override fun toString(): String = "VmInterferenceDomain" - - /** - * Queue of participants that will be removed or added to the active groups. - */ - private val _participants = ArrayDeque() - /** - * (Re-)Compute the active groups. + * (Re-)compute the active groups. */ - private fun computeActiveGroups(id: Int) { - val activeKeys = activeKeys - val groups = membership[id] - + private fun computeActiveGroups(activeKeys: ArrayList, id: Int) { if (activeKeys.isEmpty()) { return } + val groups = membership[id] val members = members - val participants = _participants + val participants = participants for (group in groups) { val groupMembers = members[group] @@ -366,7 +301,14 @@ public class VmInterferenceModel private constructor( * * @param id The identifier of the member. */ - private class InterferenceKeyImpl(@JvmField val id: Int) : VmInterferenceKey, Comparable { + private class InterferenceMemberImpl( + @JvmField val id: Int, + private val domain: InterferenceDomainImpl, + private val members: Array, + private val targets: DoubleArray, + private val scores: DoubleArray, + private val random: SplittableRandom + ) : VmInterferenceMember, Comparable { /** * The active groups to which the key belongs. */ @@ -377,16 +319,57 @@ public class VmInterferenceModel private constructor( */ private var refCount: Int = 0 - /** - * Join the domain. - */ - fun acquire(): Boolean = refCount++ <= 0 + override fun activate() { + if (refCount++ <= 0) { + domain.join(this) + } + } - /** - * Leave the domain. - */ - fun release(): Boolean = --refCount <= 0 + override fun deactivate() { + if (--refCount <= 0) { + domain.leave(this) + } + } + + override fun apply(load: Double): Double { + val groups = groups + val groupSize = groups.size + + if (groupSize == 0) { + return 1.0 + } + + val targets = targets + val scores = scores + var low = 0 + var high = groups.size - 1 + + var group = -1 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + break + } + } + + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + scores[group] + } else { + 1.0 + } + } - override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id) + override fun compareTo(other: InterferenceMemberImpl): Int = id.compareTo(other.id) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index ab2a6d76..15d32002 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -77,7 +77,7 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, PerformanceScalingGovernor(), null) + val hypervisor = SimFairShareHypervisor(platform, PerformanceScalingGovernor()) launch { machine.runWorkload(hypervisor) @@ -128,7 +128,7 @@ internal class SimFairShareHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null, null) + val hypervisor = SimFairShareHypervisor(platform, null) launch { machine.runWorkload(hypervisor) @@ -167,7 +167,7 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, null, null) + val hypervisor = SimFairShareHypervisor(platform, null) assertDoesNotThrow { launch { @@ -197,7 +197,7 @@ internal class SimFairShareHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null, interferenceDomain) + val hypervisor = SimFairShareHypervisor(platform, null) val duration = 5 * 60L val workloadA = @@ -225,11 +225,11 @@ internal class SimFairShareHypervisorTest { coroutineScope { launch { - val vm = hypervisor.newMachine(model, interferenceDomain.createKey("a")) + val vm = hypervisor.newMachine(model, interferenceDomain.getMember("a")) vm.runWorkload(workloadA) hypervisor.removeMachine(vm) } - val vm = hypervisor.newMachine(model, interferenceDomain.createKey("b")) + val vm = hypervisor.newMachine(model, interferenceDomain.getMember("b")) vm.runWorkload(workloadB) hypervisor.removeMachine(vm) } -- cgit v1.2.3 From 507ff6223d277ebc6744b92b4030d94f20a92a02 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 6 Sep 2022 10:41:38 +0200 Subject: perf(sim/compute): Prevent boxing in interference algorithm This change updates the performance interference algorithm to remove the boxing that happened due to using a generic collection for storing integers. This collection was accessed in the algorithm's hot path, so could cause slowdown. --- .../kernel/interference/VmInterferenceModel.kt | 82 ++++++++++++++++------ 1 file changed, 62 insertions(+), 20 deletions(-) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index bfda3121..b9eee536 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -221,7 +221,7 @@ public class VmInterferenceModel private constructor( override fun getMember(id: String): VmInterferenceMember? { val intId = idMapping[id] ?: return null - return keys.computeIfAbsent(intId) { InterferenceMemberImpl(it, this, members, targets, scores, random) } + return keys.computeIfAbsent(intId) { InterferenceMemberImpl(it, this, membership[it], members, targets, scores, random) } } override fun toString(): String = "VmInterferenceDomain" @@ -233,24 +233,24 @@ public class VmInterferenceModel private constructor( activeKeys.add(-pos - 1, key) } - computeActiveGroups(activeKeys, key.id) + computeActiveGroups(activeKeys, key) } fun leave(key: InterferenceMemberImpl) { val activeKeys = activeKeys activeKeys.remove(key) - computeActiveGroups(activeKeys, key.id) + computeActiveGroups(activeKeys, key) } /** * (Re-)compute the active groups. */ - private fun computeActiveGroups(activeKeys: ArrayList, id: Int) { + private fun computeActiveGroups(activeKeys: ArrayList, key: InterferenceMemberImpl) { if (activeKeys.isEmpty()) { return } - val groups = membership[id] + val groups = key.membership val members = members val participants = participants @@ -272,8 +272,11 @@ public class VmInterferenceModel private constructor( } else if (l > r) { j++ } else { - participants.add(rightEntry) - intersection++ + if (++intersection > 1) { + rightEntry.addGroup(group) + } else { + participants.add(rightEntry) + } i++ j++ @@ -282,14 +285,11 @@ public class VmInterferenceModel private constructor( while (true) { val participant = participants.poll() ?: break - val participantGroups = participant.groups + if (intersection <= 1) { - participantGroups.remove(group) + participant.removeGroup(group) } else { - val pos = participantGroups.binarySearch(group) - if (pos < 0) { - participantGroups.add(-pos - 1, group) - } + participant.addGroup(group) } } } @@ -304,6 +304,7 @@ public class VmInterferenceModel private constructor( private class InterferenceMemberImpl( @JvmField val id: Int, private val domain: InterferenceDomainImpl, + @JvmField val membership: IntArray, private val members: Array, private val targets: DoubleArray, private val scores: DoubleArray, @@ -312,7 +313,8 @@ public class VmInterferenceModel private constructor( /** * The active groups to which the key belongs. */ - @JvmField val groups: MutableList = ArrayList() + private var groups: IntArray = IntArray(2) + private var groupsSize: Int = 0 /** * The number of users of the interference key. @@ -332,18 +334,17 @@ public class VmInterferenceModel private constructor( } override fun apply(load: Double): Double { - val groups = groups - val groupSize = groups.size + val groupsSize = groupsSize - if (groupSize == 0) { + if (groupsSize == 0) { return 1.0 } + val groups = groups val targets = targets - val scores = scores - var low = 0 - var high = groups.size - 1 + var low = 0 + var high = groupsSize - 1 var group = -1 // Perform binary search over the groups based on target load @@ -370,6 +371,47 @@ public class VmInterferenceModel private constructor( } } + /** + * Add an active group to this member. + */ + fun addGroup(group: Int) { + var groups = groups + val groupsSize = groupsSize + val pos = groups.binarySearch(group, toIndex = groupsSize) + + if (pos >= 0) { + return + } + + val idx = -pos - 1 + + if (groups.size == groupsSize) { + val newSize = groupsSize + (groupsSize shr 1) + groups = groups.copyOf(newSize) + this.groups = groups + } + + groups.copyInto(groups, idx + 1, idx, groupsSize) + groups[idx] = group + this.groupsSize += 1 + } + + /** + * Remove an active group from this member. + */ + fun removeGroup(group: Int) { + val groups = groups + val groupsSize = groupsSize + val pos = groups.binarySearch(group, toIndex = groupsSize) + + if (pos < 0) { + return + } + + groups.copyInto(groups, pos, pos + 1, groupsSize) + this.groupsSize -= 1 + } + override fun compareTo(other: InterferenceMemberImpl): Int = id.compareTo(other.id) } } -- cgit v1.2.3 From 6171ab09f1df2ab3475a7b28ece383a9f87a77c5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 22 Sep 2022 10:28:37 +0200 Subject: refactor(sim/compute): Extract Random dependency from interference model This change moves the Random dependency outside the interference model, to allow the interference model to be completely immutable and passable between different simulations. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 3 +- .../org/opendc/compute/simulator/SimHostTest.kt | 8 +++- .../compute/workload/ComputeServiceHelper.kt | 24 +++++++----- .../experiments/capelin/CapelinBenchmarks.kt | 5 ++- .../opendc/experiments/capelin/CapelinRunner.kt | 5 ++- .../experiments/capelin/CapelinIntegrationTest.kt | 37 +++++++++++-------- .../simulator/compute/SimMachineBenchmarks.kt | 10 +++-- .../compute/kernel/SimAbstractHypervisor.kt | 17 ++++++--- .../compute/kernel/SimFairShareHypervisor.kt | 7 +++- .../kernel/SimFairShareHypervisorProvider.kt | 4 +- .../compute/kernel/SimHypervisorProvider.kt | 3 +- .../compute/kernel/SimSpaceSharedHypervisor.kt | 8 +++- .../kernel/SimSpaceSharedHypervisorProvider.kt | 4 +- .../kernel/interference/VmInterferenceMember.kt | 5 ++- .../kernel/interference/VmInterferenceModel.kt | 43 +++++++--------------- .../compute/kernel/SimFairShareHypervisorTest.kt | 13 +++++-- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 19 +++++++--- .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 3 +- .../opendc/workflow/service/WorkflowServiceTest.kt | 1 + 19 files changed, 130 insertions(+), 89 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 628f324b..ece3f752 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -62,6 +62,7 @@ public class SimHost( context: CoroutineContext, engine: FlowEngine, hypervisorProvider: SimHypervisorProvider, + random: SplittableRandom, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), @@ -92,7 +93,7 @@ public class SimHost( * The hypervisor to run multiple workloads. */ private val hypervisor: SimHypervisor = hypervisorProvider - .create(engine, scalingGovernor = scalingGovernor) + .create(engine, random, scalingGovernor = scalingGovernor) /** * The virtual machines running on the hypervisor. diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 5ba4a667..0b2285e5 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -67,6 +67,7 @@ internal class SimHostTest { fun testOvercommitted() = runBlockingSimulation { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) + val random = SplittableRandom(1) val host = SimHost( uid = UUID.randomUUID(), name = "test", @@ -74,7 +75,8 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - SimFairShareHypervisorProvider() + SimFairShareHypervisorProvider(), + random, ) val vmImageA = MockImage( UUID.randomUUID(), @@ -149,6 +151,7 @@ internal class SimHostTest { fun testFailure() = runBlockingSimulation { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) + val random = SplittableRandom(1) val host = SimHost( uid = UUID.randomUUID(), name = "test", @@ -156,7 +159,8 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - SimFairShareHypervisorProvider() + SimFairShareHypervisorProvider(), + random ) val image = MockImage( UUID.randomUUID(), diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index fddb4890..879ef072 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -54,6 +54,7 @@ public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, scheduler: ComputeScheduler, + seed: Long, private val failureModel: FailureModel? = null, private val interferenceModel: VmInterferenceModel? = null, schedulingQuantum: Duration = Duration.ofMinutes(5) @@ -66,12 +67,17 @@ public class ComputeServiceHelper( /** * The [FlowEngine] to simulate the hosts. */ - private val _engine = FlowEngine(context, clock) + private val engine = FlowEngine(context, clock) /** * The hosts that belong to this class. */ - private val _hosts = mutableSetOf() + private val hosts = mutableSetOf() + + /** + * The source of randomness. + */ + private val random = SplittableRandom(seed) init { val service = createService(scheduler, schedulingQuantum) @@ -82,18 +88,15 @@ public class ComputeServiceHelper( * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. * * @param trace The trace to simulate. - * @param seed The seed for the simulation. * @param servers A list to which the created servers is added. * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). */ public suspend fun run( trace: List, - seed: Long, servers: MutableList? = null, submitImmediately: Boolean = false ) { - val random = Random(seed) - val injector = failureModel?.createInjector(context, clock, service, random) + val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) val client = service.newClient() // Create new image for the virtual machine @@ -170,14 +173,15 @@ public class ComputeServiceHelper( spec.model, spec.meta, context, - _engine, + engine, spec.hypervisor, + random, powerDriver = spec.powerDriver, interferenceDomain = interferenceModel?.newDomain(), optimize = optimize ) - require(_hosts.add(host)) { "Host with uid ${spec.uid} already exists" } + require(hosts.add(host)) { "Host with uid ${spec.uid} already exists" } service.addHost(host) return host @@ -186,11 +190,11 @@ public class ComputeServiceHelper( override fun close() { service.close() - for (host in _hosts) { + for (host in hosts) { host.close() } - _hosts.clear() + hosts.clear() } /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index fd2c26f0..074ffc3e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -68,12 +68,13 @@ class CapelinBenchmarks { val runner = ComputeServiceHelper( coroutineContext, clock, - computeScheduler + computeScheduler, + seed = 0L, ) try { runner.apply(topology, isOptimized) - runner.run(vms, 0) + runner.run(vms) } finally { runner.close() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index cca5b6cf..2f417172 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -73,8 +73,9 @@ public class CapelinRunner( coroutineContext, clock, computeScheduler, + seed, failureModel, - interferenceModel?.withSeed(seed) + interferenceModel?.takeIf { operationalPhenomena.hasInterference } ) val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) @@ -104,7 +105,7 @@ public class CapelinRunner( runner.apply(topology, optimize = true) // Run the workload trace - runner.run(vms, seeder.nextLong(), servers) + runner.run(vms, servers) // Stop the metric collection exporter?.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 368b0086..ff9faef7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -81,11 +81,13 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val (workload, _) = createTestWorkload(1.0) + val seed = 0L + val (workload, _) = createTestWorkload(1.0, seed) val runner = ComputeServiceHelper( coroutineContext, clock, - computeScheduler + computeScheduler, + seed, ) val topology = createTopology() @@ -94,7 +96,7 @@ class CapelinIntegrationTest { try { runner.apply(topology) - runner.run(workload, 0, servers) + runner.run(workload, servers) val serviceMetrics = runner.service.getSchedulerStats() println( @@ -129,12 +131,13 @@ class CapelinIntegrationTest { */ @Test fun testSmall() = runBlockingSimulation { - val seed = 1 + val seed = 1L val (workload, _) = createTestWorkload(0.25, seed) val runner = ComputeServiceHelper( coroutineContext, clock, - computeScheduler + computeScheduler, + seed, ) val topology = createTopology("single") val servers = mutableListOf() @@ -142,7 +145,7 @@ class CapelinIntegrationTest { try { runner.apply(topology) - runner.run(workload, seed.toLong(), servers) + runner.run(workload, servers) val serviceMetrics = runner.service.getSchedulerStats() println( @@ -173,14 +176,15 @@ class CapelinIntegrationTest { */ @Test fun testInterference() = runBlockingSimulation { - val seed = 0 + val seed = 0L val (workload, interferenceModel) = createTestWorkload(1.0, seed) val simulator = ComputeServiceHelper( coroutineContext, clock, computeScheduler, - interferenceModel = interferenceModel?.withSeed(seed.toLong()) + seed, + interferenceModel = interferenceModel ) val topology = createTopology("single") val servers = mutableListOf() @@ -188,7 +192,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(workload, seed.toLong(), servers) + simulator.run(workload, servers) val serviceMetrics = simulator.service.getSchedulerStats() println( @@ -218,11 +222,12 @@ class CapelinIntegrationTest { */ @Test fun testFailures() = runBlockingSimulation { - val seed = 1 + val seed = 0L val simulator = ComputeServiceHelper( coroutineContext, clock, computeScheduler, + seed, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") @@ -232,7 +237,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(workload, seed.toLong(), servers) + simulator.run(workload, servers) val serviceMetrics = simulator.service.getSchedulerStats() println( @@ -250,20 +255,20 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(10982026, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740058, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(2590260605, monitor.uptime) { "Uptime incorrect" } }, ) } /** * Obtain the trace reader for the test. */ - private fun createTestWorkload(fraction: Double, seed: Int = 0): ComputeWorkload.Resolved { + private fun createTestWorkload(fraction: Double, seed: Long): ComputeWorkload.Resolved { val source = trace("bitbrains-small").sampleByLoad(fraction) - return source.resolve(workloadLoader, Random(seed.toLong())) + return source.resolve(workloadLoader, Random(seed)) } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 02b48fa7..c3332d66 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -37,6 +37,7 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine import org.openjdk.jmh.annotations.* +import java.util.SplittableRandom import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -85,7 +86,8 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimSpaceSharedHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } @@ -107,7 +109,8 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimFairShareHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } @@ -129,7 +132,8 @@ class SimMachineBenchmarks { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimFairShareHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 98dab28f..2cabeece 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -31,6 +31,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* import org.opendc.simulator.flow.mux.FlowMultiplexer +import java.util.SplittableRandom import kotlin.math.roundToLong /** @@ -38,10 +39,12 @@ import kotlin.math.roundToLong * * @param engine The [FlowEngine] to drive the simulation. * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. + * @param random A randomness generator for the interference calculations. */ public abstract class SimAbstractHypervisor( protected val engine: FlowEngine, - private val scalingGovernor: ScalingGovernor? + private val scalingGovernor: ScalingGovernor?, + private val random: SplittableRandom ) : SimHypervisor, FlowConvergenceListener { /** * The machine on which the hypervisor runs. @@ -142,8 +145,12 @@ public abstract class SimAbstractHypervisor( if (delta > 0) { _counters.record() + val mux = mux + val load = mux.rate / mux.capacity.coerceAtLeast(1.0) + val random = random + for (vm in _vms) { - vm._counters.record() + vm._counters.record(random, load) } } @@ -351,7 +358,7 @@ public abstract class SimAbstractHypervisor( /** * Record the CPU time of the hypervisor. */ - fun record() { + fun record(random: SplittableRandom, load: Double) { val cpuTime = _cpuTime val previous = _previous @@ -383,9 +390,7 @@ public abstract class SimAbstractHypervisor( // Compute the performance penalty due to flow interference val key = key if (key != null) { - val mux = mux - val load = mux.rate / mux.capacity.coerceAtLeast(1.0) - val penalty = 1 - key.apply(load) + val penalty = 1 - key.apply(random, load) val interference = (actualDelta * d * penalty).roundToLong() if (interference > 0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 66453835..4435a422 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -29,6 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.flow.mux.FlowMultiplexer import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer +import java.util.SplittableRandom /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] @@ -36,11 +37,13 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer * * @param engine The [FlowEngine] to manage the machine's resources. * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. + * @param random A randomness generator for the interference calculations. */ public class SimFairShareHypervisor( engine: FlowEngine, - scalingGovernor: ScalingGovernor? -) : SimAbstractHypervisor(engine, scalingGovernor) { + scalingGovernor: ScalingGovernor?, + random: SplittableRandom +) : SimAbstractHypervisor(engine, scalingGovernor, random) { /** * The multiplexer that distributes the computing capacity. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index ad8177d3..c7008652 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.flow.FlowEngine +import java.util.* /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -33,6 +34,7 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, + random: SplittableRandom, scalingGovernor: ScalingGovernor?, - ): SimHypervisor = SimFairShareHypervisor(engine, scalingGovernor) + ): SimHypervisor = SimFairShareHypervisor(engine, scalingGovernor, random) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index 6ee523fd..020a2a60 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.flow.FlowEngine +import java.util.SplittableRandom /** * A service provider interface for constructing a [SimHypervisor]. @@ -40,5 +41,5 @@ public interface SimHypervisorProvider { /** * Create a new [SimHypervisor] instance. */ - public fun create(engine: FlowEngine, scalingGovernor: ScalingGovernor? = null): SimHypervisor + public fun create(engine: FlowEngine, random: SplittableRandom, scalingGovernor: ScalingGovernor? = null): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index 7976077c..51bf4ce5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -27,14 +27,20 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.flow.mux.FlowMultiplexer import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer +import java.util.SplittableRandom /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. + * + * @param engine The [FlowEngine] to manage the machine's resources. + * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. + * @param random A randomness generator for the interference calculations. */ public class SimSpaceSharedHypervisor( engine: FlowEngine, scalingGovernor: ScalingGovernor?, -) : SimAbstractHypervisor(engine, scalingGovernor) { + random: SplittableRandom +) : SimAbstractHypervisor(engine, scalingGovernor, random) { override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this) override fun canFit(model: MachineModel): Boolean { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index f7456797..05c54528 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.flow.FlowEngine +import java.util.* /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -33,6 +34,7 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, + random: SplittableRandom, scalingGovernor: ScalingGovernor?, - ): SimHypervisor = SimSpaceSharedHypervisor(engine, scalingGovernor) + ): SimHypervisor = SimSpaceSharedHypervisor(engine, scalingGovernor, random) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt index 2f3dd74b..04203c63 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute.kernel.interference +import java.util.* + /** * A participant of an interference domain. */ @@ -39,9 +41,10 @@ public interface VmInterferenceMember { /** * Compute the performance score of the member in this interference domain. * + * @param random The source of randomness to apply when computing the performance score. * @param load The overall load on the interference domain. * @return A score representing the performance score to be applied to the member, with 1 * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. */ - public fun apply(load: Double): Double + public fun apply(random: SplittableRandom, load: Double): Double } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index b9eee536..3ea869d4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -32,7 +32,6 @@ import java.util.* * @param members The members belonging to each group. * @param membership The identifier of each key. * @param size The number of groups. - * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ public class VmInterferenceModel private constructor( private val idMapping: Map, @@ -40,25 +39,12 @@ public class VmInterferenceModel private constructor( private val membership: Array, private val targets: DoubleArray, private val scores: DoubleArray, - private val size: Int, - seed: Long, + private val size: Int ) { - /** - * A [SplittableRandom] used for selecting the virtual machines that are affected. - */ - private val random = SplittableRandom(seed) - /** * Construct a new [VmInterferenceDomain]. */ - public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(idMapping, members, membership, targets, scores, random) - - /** - * Create a copy of this model with a different seed. - */ - public fun withSeed(seed: Long): VmInterferenceModel { - return VmInterferenceModel(idMapping, members, membership, targets, scores, size, seed) - } + public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(idMapping, members, membership, targets, scores) public companion object { /** @@ -72,11 +58,6 @@ public class VmInterferenceModel private constructor( * Builder class for a [VmInterferenceModel] */ public class Builder internal constructor() { - /** - * The initial capacity of the builder. - */ - private val INITIAL_CAPACITY = 256 - /** * The target load of each group. */ @@ -125,7 +106,7 @@ public class VmInterferenceModel private constructor( /** * Build the [VmInterferenceModel]. */ - public fun build(seed: Long = 0): VmInterferenceModel { + public fun build(): VmInterferenceModel { val size = size val targets = _targets val scores = _scores @@ -176,8 +157,7 @@ public class VmInterferenceModel private constructor( membership.map { it.value.toIntArray() }.toTypedArray(), newTargets, newScores, - size, - seed + size ) } @@ -191,6 +171,13 @@ public class VmInterferenceModel private constructor( _targets = _targets.copyOf(newSize) _scores = _scores.copyOf(newSize) } + + private companion object { + /** + * The initial capacity of the builder. + */ + const val INITIAL_CAPACITY = 256 + } } /** @@ -202,7 +189,6 @@ public class VmInterferenceModel private constructor( private val membership: Array, private val targets: DoubleArray, private val scores: DoubleArray, - private val random: SplittableRandom ) : VmInterferenceDomain { /** * Keys registered with this domain. @@ -221,7 +207,7 @@ public class VmInterferenceModel private constructor( override fun getMember(id: String): VmInterferenceMember? { val intId = idMapping[id] ?: return null - return keys.computeIfAbsent(intId) { InterferenceMemberImpl(it, this, membership[it], members, targets, scores, random) } + return keys.computeIfAbsent(intId) { InterferenceMemberImpl(it, this, membership[it], members, targets, scores) } } override fun toString(): String = "VmInterferenceDomain" @@ -307,8 +293,7 @@ public class VmInterferenceModel private constructor( @JvmField val membership: IntArray, private val members: Array, private val targets: DoubleArray, - private val scores: DoubleArray, - private val random: SplittableRandom + private val scores: DoubleArray ) : VmInterferenceMember, Comparable { /** * The active groups to which the key belongs. @@ -333,7 +318,7 @@ public class VmInterferenceModel private constructor( } } - override fun apply(load: Double): Double { + override fun apply(random: SplittableRandom, load: Double): Double { val groupsSize = groupsSize if (groupsSize == 0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 15d32002..23d832e8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import java.util.* /** * Test suite for the [SimHypervisor] class. @@ -77,7 +78,8 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, PerformanceScalingGovernor()) + val random = SplittableRandom(1) + val hypervisor = SimFairShareHypervisor(platform, PerformanceScalingGovernor(), random) launch { machine.runWorkload(hypervisor) @@ -128,7 +130,8 @@ internal class SimFairShareHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null) + val random = SplittableRandom(1) + val hypervisor = SimFairShareHypervisor(platform, null, random) launch { machine.runWorkload(hypervisor) @@ -167,7 +170,8 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, null) + val random = SplittableRandom(1) + val hypervisor = SimFairShareHypervisor(platform, null, random) assertDoesNotThrow { launch { @@ -197,7 +201,8 @@ internal class SimFairShareHypervisorTest { val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(platform, null) + val random = SplittableRandom(1) + val hypervisor = SimFairShareHypervisor(platform, null, random) val duration = 5 * 60L val workloadA = diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 0f533130..9471f548 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -40,6 +40,7 @@ import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import java.util.* /** * A test suite for the [SimSpaceSharedHypervisor]. @@ -75,7 +76,8 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimSpaceSharedHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } val vm = hypervisor.newMachine(machineModel) @@ -97,7 +99,8 @@ internal class SimSpaceSharedHypervisorTest { val workload = SimRuntimeWorkload(duration) val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimSpaceSharedHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } yield() @@ -121,7 +124,8 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimSpaceSharedHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } yield() @@ -142,7 +146,8 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimSpaceSharedHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } yield() @@ -169,7 +174,8 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null) + val random = SplittableRandom(1) + val hypervisor = SimSpaceSharedHypervisor(engine, null, random) launch { machine.runWorkload(hypervisor) } yield() @@ -193,7 +199,8 @@ internal class SimSpaceSharedHypervisorTest { val machine = SimBareMetalMachine( interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter, null) + val random = SplittableRandom(1) + val hypervisor = SimSpaceSharedHypervisor(interpreter, null, random) launch { machine.runWorkload(hypervisor) } yield() diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index c958bdb2..d5dbed1c 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -220,6 +220,7 @@ public class OpenDCRunner( coroutineContext, clock, computeScheduler, + seed = 0L, failureModel, interferenceModel.takeIf { phenomena.interference } ) @@ -230,7 +231,7 @@ public class OpenDCRunner( // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, seeder.nextLong(), servers) + simulator.run(vms, servers) val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 73d1b23b..f6fa2134 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -73,6 +73,7 @@ internal class WorkflowServiceTest { coroutineContext, clock, computeScheduler, + seed = 0, schedulingQuantum = Duration.ofSeconds(1) ) -- cgit v1.2.3 From 21270e0b4250bd6927e85227fa825cf8ed59aaed Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 22 Sep 2022 10:55:13 +0200 Subject: refactor(compute): Add separate error host state This change adds a new HostState to indicate that the host is in an error state as opposed to being purposefully unavailable. --- .../org/opendc/compute/service/driver/HostState.kt | 11 ++++++++--- .../compute/service/internal/ComputeServiceImpl.kt | 2 +- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 23 ++++++++-------------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt index 6d85ee2d..ca6c625c 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/HostState.kt @@ -27,12 +27,17 @@ package org.opendc.compute.service.driver */ public enum class HostState { /** - * The host is up. + * The host is up and able to host guests. */ UP, /** - * The host is down. + * The host is in a (forced) down state and unable to host any guests. */ - DOWN + DOWN, + + /** + * The host is in an error state and unable to host any guests. + */ + ERROR, } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 21aaa19e..52ee780b 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -411,7 +411,7 @@ internal class ComputeServiceImpl( // Re-schedule on the new machine requestSchedulingCycle() } - HostState.DOWN -> { + else -> { logger.debug { "[${clock.instant()}] Host ${host.uid} state changed: $newState" } val hv = hostToView[host] ?: return diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index ece3f752..56b1c8d1 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -59,7 +59,7 @@ public class SimHost( override val name: String, model: MachineModel, override val meta: Map, - context: CoroutineContext, + private val context: CoroutineContext, engine: FlowEngine, hypervisorProvider: SimHypervisorProvider, random: SplittableRandom, @@ -69,11 +69,6 @@ public class SimHost( private val interferenceDomain: VmInterferenceDomain? = null, private val optimize: Boolean = false ) : Host, AutoCloseable { - /** - * The [CoroutineScope] of the host bounded by the lifecycle of the host. - */ - private val scope: CoroutineScope = CoroutineScope(context + Job()) - /** * The clock instance used by the host. */ @@ -148,7 +143,7 @@ public class SimHost( val interferenceKey = interferenceDomain?.getMember(key.name) val machine = hypervisor.newMachine(key.flavor.toMachineModel(), interferenceKey) val newGuest = Guest( - scope.coroutineContext, + context, clock, this, hypervisor, @@ -195,8 +190,7 @@ public class SimHost( } override fun close() { - reset() - scope.cancel() + reset(HostState.DOWN) machine.cancel() } @@ -271,7 +265,7 @@ public class SimHost( override fun toString(): String = "SimHost[uid=$uid,name=$name,model=$model]" public suspend fun fail() { - reset() + reset(HostState.ERROR) for (guest in _guests) { guest.fail() @@ -310,7 +304,7 @@ public class SimHost( _state = HostState.UP hypervisor.onStart(ctx) } catch (cause: Throwable) { - _state = HostState.DOWN + _state = HostState.ERROR _ctx = null throw cause } @@ -320,7 +314,6 @@ public class SimHost( try { hypervisor.onStop(ctx) } finally { - _state = HostState.DOWN _ctx = null } } @@ -330,12 +323,12 @@ public class SimHost( /** * Reset the machine. */ - private fun reset() { + private fun reset(state: HostState) { updateUptime() // Stop the hypervisor _ctx?.close() - _state = HostState.DOWN + _state = state } /** @@ -386,7 +379,7 @@ public class SimHost( if (_state == HostState.UP) { _uptime += duration - } else if (_state == HostState.DOWN && scope.isActive) { + } else if (_state == HostState.ERROR) { // Only increment downtime if the machine is in a failure state _downtime += duration } -- cgit v1.2.3 From 8b6c15193281171bcb2e111f339ffb8da385332b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 22 Sep 2022 11:12:01 +0200 Subject: refactor(compute): Simplify constructor of SimHost This change updates the constructor of SimHost to receive a `SimBareMetalMachine` and `SimHypervisor` directly instead of construction these objects itself. This ensures better testability and also simplifies the constructor of this class, especially when future changes to `SimBareMetalMachine` or `SimHypervisor` change their constructors. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 61 ++++------------------ .../org/opendc/compute/simulator/SimHostTest.kt | 25 +++++---- .../compute/workload/ComputeServiceHelper.kt | 12 +++-- .../opendc/simulator/compute/model/MachineModel.kt | 18 ++++++- 4 files changed, 49 insertions(+), 67 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 56b1c8d1..0fa91d52 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -35,17 +35,11 @@ import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.kernel.SimHypervisorProvider -import org.opendc.simulator.compute.kernel.cpufreq.PerformanceScalingGovernor -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.power.PowerDriver -import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowEngine +import java.time.Clock import java.time.Duration import java.time.Instant import java.util.* @@ -57,39 +51,20 @@ import kotlin.coroutines.CoroutineContext public class SimHost( override val uid: UUID, override val name: String, - model: MachineModel, override val meta: Map, private val context: CoroutineContext, - engine: FlowEngine, - hypervisorProvider: SimHypervisorProvider, - random: SplittableRandom, - scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), - powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), + private val clock: Clock, + private val machine: SimBareMetalMachine, + private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), private val interferenceDomain: VmInterferenceDomain? = null, private val optimize: Boolean = false ) : Host, AutoCloseable { - /** - * The clock instance used by the host. - */ - private val clock = engine.clock - /** * The event listeners registered with this host. */ private val listeners = mutableListOf() - /** - * The machine to run on. - */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver) - - /** - * The hypervisor to run multiple workloads. - */ - private val hypervisor: SimHypervisor = hypervisorProvider - .create(engine, random, scalingGovernor = scalingGovernor) - /** * The virtual machines running on the hypervisor. */ @@ -109,7 +84,11 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.sumOf { it.frequency }, model.cpus.size, model.memory.sumOf { it.size }) + override val model: HostModel = HostModel( + machine.model.cpus.sumOf { it.frequency }, + machine.model.cpus.size, + machine.model.memory.sumOf { it.size } + ) /** * The [GuestListener] that listens for guest events. @@ -341,26 +320,8 @@ public class SimHost( val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode, frequency = cpuCapacity) } val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - return MachineModel(processingUnits, memoryUnits).optimize() - } - - /** - * Optimize the [MachineModel] for simulation. - */ - private fun MachineModel.optimize(): MachineModel { - if (!optimize) { - return this - } - - val originalCpu = cpus[0] - val freq = cpus.sumOf { it.frequency } - val processingNode = originalCpu.node.copy(coreCount = 1) - val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) - - val memorySize = memory.sumOf { it.size } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - - return MachineModel(processingUnits, memoryUnits) + val model = MachineModel(processingUnits, memoryUnits) + return if (optimize) model.optimize() else model } private var _lastReport = clock.millis() diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 0b2285e5..06500a06 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -30,11 +30,14 @@ import org.junit.jupiter.api.assertAll import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.kernel.SimFairShareHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -67,16 +70,16 @@ internal class SimHostTest { fun testOvercommitted() = runBlockingSimulation { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) - val random = SplittableRandom(1) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimFairShareHypervisor(engine, null, SplittableRandom(1)) val host = SimHost( uid = UUID.randomUUID(), name = "test", - model = machineModel, meta = emptyMap(), coroutineContext, - engine, - SimFairShareHypervisorProvider(), - random, + clock, + machine, + hypervisor ) val vmImageA = MockImage( UUID.randomUUID(), @@ -151,16 +154,16 @@ internal class SimHostTest { fun testFailure() = runBlockingSimulation { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) - val random = SplittableRandom(1) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimFairShareHypervisor(engine, null, SplittableRandom(1)) val host = SimHost( uid = UUID.randomUUID(), name = "test", - model = machineModel, meta = emptyMap(), coroutineContext, - engine, - SimFairShareHypervisorProvider(), - random + clock, + machine, + hypervisor ) val image = MockImage( UUID.randomUUID(), diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 879ef072..92652329 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -31,6 +31,7 @@ import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec +import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.flow.FlowEngine @@ -167,16 +168,17 @@ public class ComputeServiceHelper( * @return The [SimHost] that has been constructed by the runner. */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { + val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver) + val hypervisor = spec.hypervisor.create(engine, random) + val host = SimHost( spec.uid, spec.name, - spec.model, spec.meta, context, - engine, - spec.hypervisor, - random, - powerDriver = spec.powerDriver, + clock, + machine, + hypervisor, interferenceDomain = interferenceModel?.newDomain(), optimize = optimize ) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt index 7e4d7191..22dcaef4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt @@ -35,4 +35,20 @@ public data class MachineModel( public val memory: List, public val net: List = emptyList(), public val storage: List = emptyList() -) +) { + /** + * Optimize the [MachineModel] by merging all resources of the same type into a single resource with the combined + * capacity. Such configurations can be simulated more efficiently by OpenDC. + */ + public fun optimize(): MachineModel { + val originalCpu = cpus[0] + val freq = cpus.sumOf { it.frequency } + val processingNode = originalCpu.node.copy(coreCount = 1) + val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) + + val memorySize = memory.sumOf { it.size } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return MachineModel(processingUnits, memoryUnits) + } +} -- cgit v1.2.3 From 17fa7619f1d7e96680e018d3f12f333fb75cdac1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 22 Sep 2022 14:45:12 +0200 Subject: refactor(sim/compute): Make interference domain independent of profile This change updates the virtual machine performance interference model so that the interference domain can be constructed independently of the interference profile. As a consequence, the construction of the topology now does not depend anymore on the interference profile. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 5 +- .../org/opendc/compute/simulator/internal/Guest.kt | 2 +- .../org/opendc/compute/simulator/SimHostTest.kt | 4 +- .../compute/workload/ComputeServiceHelper.kt | 25 ++- .../org/opendc/compute/workload/ComputeWorkload.kt | 8 +- .../compute/workload/ComputeWorkloadLoader.kt | 15 +- .../org/opendc/compute/workload/VirtualMachine.kt | 3 + .../workload/internal/CompositeComputeWorkload.kt | 12 +- .../workload/internal/HpcSampledComputeWorkload.kt | 6 +- .../internal/LoadSampledComputeWorkload.kt | 6 +- .../workload/internal/TraceComputeWorkload.kt | 3 +- .../experiments/capelin/CapelinBenchmarks.kt | 6 +- .../opendc/experiments/capelin/CapelinRunner.kt | 7 +- .../experiments/capelin/CapelinIntegrationTest.kt | 17 +- .../simulator/compute/SimMachineBenchmarks.kt | 6 +- .../compute/kernel/SimAbstractHypervisor.kt | 42 ++-- .../compute/kernel/SimFairShareHypervisor.kt | 11 +- .../kernel/SimFairShareHypervisorProvider.kt | 4 +- .../simulator/compute/kernel/SimHypervisor.kt | 4 +- .../compute/kernel/SimHypervisorProvider.kt | 8 +- .../compute/kernel/SimSpaceSharedHypervisor.kt | 11 +- .../kernel/SimSpaceSharedHypervisorProvider.kt | 4 +- .../kernel/interference/VmInterferenceDomain.kt | 112 +++++++++- .../kernel/interference/VmInterferenceMember.kt | 123 ++++++++++- .../kernel/interference/VmInterferenceModel.kt | 237 ++------------------- .../kernel/interference/VmInterferenceProfile.kt | 51 +++++ .../compute/kernel/SimFairShareHypervisorTest.kt | 21 +- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 12 +- .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 7 +- 29 files changed, 416 insertions(+), 356 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 0fa91d52..c04573b5 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -35,7 +35,6 @@ import org.opendc.compute.simulator.internal.Guest import org.opendc.compute.simulator.internal.GuestListener import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.SimHypervisor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.workload.SimWorkload @@ -57,7 +56,6 @@ public class SimHost( private val machine: SimBareMetalMachine, private val hypervisor: SimHypervisor, private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), - private val interferenceDomain: VmInterferenceDomain? = null, private val optimize: Boolean = false ) : Host, AutoCloseable { /** @@ -119,8 +117,7 @@ public class SimHost( val guest = guests.computeIfAbsent(server) { key -> require(canFit(key)) { "Server does not fit" } - val interferenceKey = interferenceDomain?.getMember(key.name) - val machine = hypervisor.newMachine(key.flavor.toMachineModel(), interferenceKey) + val machine = hypervisor.newMachine(key.flavor.toMachineModel()) val newGuest = Guest( context, clock, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index ea3c6549..cc084526 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -217,7 +217,7 @@ internal class Guest( */ private suspend fun runMachine(workload: SimWorkload) { delay(1) // TODO Introduce model for boot time - machine.runWorkload(workload, mapOf("driver" to host, "server" to server)) + machine.runWorkload(workload, mapOf("driver" to host, "server" to server) + server.meta) } /** diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 06500a06..879f15b2 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -71,7 +71,7 @@ internal class SimHostTest { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(engine, null, SplittableRandom(1)) + val hypervisor = SimFairShareHypervisor(engine, SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", @@ -155,7 +155,7 @@ internal class SimHostTest { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(engine, null, SplittableRandom(1)) + val hypervisor = SimFairShareHypervisor(engine, SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 92652329..ad132efe 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -32,7 +32,7 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.flow.FlowEngine import java.time.Clock @@ -48,7 +48,6 @@ import kotlin.math.max * @param clock [Clock] instance tracking simulation time. * @param scheduler [ComputeScheduler] implementation to use for the service. * @param failureModel A failure model to use for injecting failures. - * @param interferenceModel The model to use for performance interference. * @param schedulingQuantum The scheduling quantum of the scheduler. */ public class ComputeServiceHelper( @@ -57,7 +56,6 @@ public class ComputeServiceHelper( scheduler: ComputeScheduler, seed: Long, private val failureModel: FailureModel? = null, - private val interferenceModel: VmInterferenceModel? = null, schedulingQuantum: Duration = Duration.ofMinutes(5) ) : AutoCloseable { /** @@ -91,11 +89,13 @@ public class ComputeServiceHelper( * @param trace The trace to simulate. * @param servers A list to which the created servers is added. * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param interference A flag to indicate that VM interference needs to be enabled. */ public suspend fun run( trace: List, servers: MutableList? = null, - submitImmediately: Boolean = false + submitImmediately: Boolean = false, + interference: Boolean = false, ) { val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) val client = service.newClient() @@ -125,10 +125,16 @@ public class ComputeServiceHelper( delay(max(0, (start - offset) - now)) } - launch { - val workloadOffset = -offset + 300001 - val workload = SimTraceWorkload(entry.trace, workloadOffset) + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload(entry.trace, workloadOffset) + val meta = mutableMapOf("workload" to workload) + + val interferenceProfile = entry.interferenceProfile + if (interference && interferenceProfile != null) { + meta["interference-profile"] = interferenceProfile + } + launch { val server = client.newServer( entry.name, image, @@ -138,7 +144,7 @@ public class ComputeServiceHelper( entry.memCapacity, meta = if (entry.cpuCapacity > 0.0) mapOf("cpu-capacity" to entry.cpuCapacity) else emptyMap() ), - meta = mapOf("workload" to workload) + meta = meta ) servers?.add(server) @@ -169,7 +175,7 @@ public class ComputeServiceHelper( */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver) - val hypervisor = spec.hypervisor.create(engine, random) + val hypervisor = spec.hypervisor.create(engine, random, interferenceDomain = VmInterferenceDomain()) val host = SimHost( spec.uid, @@ -179,7 +185,6 @@ public class ComputeServiceHelper( clock, machine, hypervisor, - interferenceDomain = interferenceModel?.newDomain(), optimize = optimize ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index aa0b5eaf..78002c2f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -22,7 +22,6 @@ package org.opendc.compute.workload -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import java.util.* /** @@ -32,10 +31,5 @@ public interface ComputeWorkload { /** * Resolve the workload into a list of [VirtualMachine]s to simulate. */ - public fun resolve(loader: ComputeWorkloadLoader, random: Random): Resolved - - /** - * A concrete instance of a workload. - */ - public data class Resolved(val vms: List, val interferenceModel: VmInterferenceModel?) + public fun resolve(loader: ComputeWorkloadLoader, random: Random): List } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 7ed04994..387a3ec2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -48,7 +48,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The cache of workloads. */ - private val cache = ConcurrentHashMap>() + private val cache = ConcurrentHashMap>>() /** * Read the fragments into memory. @@ -87,7 +87,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(trace: Trace, fragments: Map): List { + private fun parseMeta(trace: Trace, fragments: Map, interferenceModel: VmInterferenceModel): List { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() val idCol = reader.resolve(RESOURCE_ID) @@ -128,7 +128,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { totalLoad, submissionTime, endTime, - builder.build() + builder.build(), + interferenceModel.getProfile(id) ) ) } @@ -159,7 +160,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val modelBuilder = VmInterferenceModel.builder() while (reader.nextRow()) { - @Suppress("UNCHECKED_CAST") val members = reader.getSet(membersCol, String::class.java)!! val target = reader.getDouble(targetCol) val score = reader.getDouble(scoreCol) @@ -177,7 +177,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Load the trace with the specified [name] and [format]. */ - public fun get(name: String, format: String): ComputeWorkload.Resolved { + public fun get(name: String, format: String): List { val ref = cache.compute(name) { key, oldVal -> val inst = oldVal?.get() if (inst == null) { @@ -188,11 +188,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val trace = Trace.open(path, format) val fragments = parseFragments(trace) - val vms = parseMeta(trace, fragments) val interferenceModel = parseInterferenceModel(trace) - val instance = ComputeWorkload.Resolved(vms, interferenceModel) + val vms = parseMeta(trace, fragments, interferenceModel) - SoftReference(instance) + SoftReference(vms) } else { oldVal } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 88e80719..8560b537 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -22,6 +22,7 @@ package org.opendc.compute.workload +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile import org.opendc.simulator.compute.workload.SimTrace import java.time.Instant import java.util.* @@ -37,6 +38,7 @@ import java.util.* * @param startTime The start time of the VM. * @param stopTime The stop time of the VM. * @param trace The trace that belong to this VM. + * @param interferenceProfile The interference profile of this virtual machine. */ public data class VirtualMachine( val uid: UUID, @@ -48,4 +50,5 @@ public data class VirtualMachine( val startTime: Instant, val stopTime: Instant, val trace: SimTrace, + val interferenceProfile: VmInterferenceProfile? ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index 1959c48d..9b2bec55 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -37,17 +37,17 @@ internal class CompositeComputeWorkload(val sources: Map { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } - val totalLoad = traces.sumOf { (_, w) -> w.vms.sumOf { it.totalLoad } } + val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } val res = mutableListOf() - for ((fraction, w) in traces) { + for ((fraction, vms) in traces) { var currentLoad = 0.0 - for (entry in w.vms) { + for (entry in vms) { val entryLoad = entry.totalLoad if ((currentLoad + entryLoad) / totalLoad > fraction) { break @@ -58,9 +58,9 @@ internal class CompositeComputeWorkload(val sources: Map w.vms.size } + val vmCount = traces.sumOf { (_, vms) -> vms.size } logger.info { "Sampled $vmCount VMs into subset of ${res.size} VMs" } - return ComputeWorkload.Resolved(res, null) + return res } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index 84a77f0f..52f4c672 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -45,8 +45,8 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti */ private val pattern = Regex("^(ComputeNode|cn).*") - override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { - val (vms, interferenceModel) = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List { + val vms = source.resolve(loader, random) val (hpc, nonHpc) = vms.partition { entry -> val name = entry.name @@ -130,7 +130,7 @@ internal class HpcSampledComputeWorkload(val source: ComputeWorkload, val fracti logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return ComputeWorkload.Resolved(res, interferenceModel) + return res } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index bc13560c..ef6de729 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -37,8 +37,8 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract */ private val logger = KotlinLogging.logger {} - override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { - val (vms, interferenceModel) = source.resolve(loader, random) + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List { + val vms = source.resolve(loader, random) val res = mutableListOf() val totalLoad = vms.sumOf { it.totalLoad } @@ -56,6 +56,6 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } - return ComputeWorkload.Resolved(res, interferenceModel) + return res } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index dc9abaef..c20cb8f3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -24,13 +24,14 @@ package org.opendc.compute.workload.internal import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.VirtualMachine import java.util.* /** * A [ComputeWorkload] from a trace. */ internal class TraceComputeWorkload(val name: String, val format: String) : ComputeWorkload { - override fun resolve(loader: ComputeWorkloadLoader, random: Random): ComputeWorkload.Resolved { + override fun resolve(loader: ComputeWorkloadLoader, random: Random): List { return loader.get(name, format) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 074ffc3e..c09ce96a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -55,7 +55,7 @@ class CapelinBenchmarks { fun setUp() { val loader = ComputeWorkloadLoader(File("src/test/resources/trace")) val source = trace("bitbrains-small") - vms = source.resolve(loader, Random(1L)).vms + vms = trace("bitbrains-small").resolve(loader, Random(1L)) topology = checkNotNull(object {}.javaClass.getResourceAsStream("/topology.txt")).use { clusterTopology(it) } } @@ -69,12 +69,12 @@ class CapelinBenchmarks { coroutineContext, clock, computeScheduler, - seed = 0L, + seed = 0L ) try { runner.apply(topology, isOptimized) - runner.run(vms) + runner.run(vms, interference = true) } finally { runner.close() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 2f417172..7be09ff5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -68,14 +68,13 @@ public class CapelinRunner( grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null - val (vms, interferenceModel) = scenario.workload.source.resolve(workloadLoader, seeder) + val vms = scenario.workload.source.resolve(workloadLoader, seeder) val runner = ComputeServiceHelper( coroutineContext, clock, computeScheduler, seed, - failureModel, - interferenceModel?.takeIf { operationalPhenomena.hasInterference } + failureModel ) val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) @@ -105,7 +104,7 @@ public class CapelinRunner( runner.apply(topology, optimize = true) // Run the workload trace - runner.run(vms, servers) + runner.run(vms, servers, interference = operationalPhenomena.hasInterference) // Stop the metric collection exporter?.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index ff9faef7..af846dd6 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -82,7 +82,7 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val seed = 0L - val (workload, _) = createTestWorkload(1.0, seed) + val workload = createTestWorkload(1.0, seed) val runner = ComputeServiceHelper( coroutineContext, clock, @@ -132,7 +132,7 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1L - val (workload, _) = createTestWorkload(0.25, seed) + val workload = createTestWorkload(0.25, seed) val runner = ComputeServiceHelper( coroutineContext, clock, @@ -177,14 +177,13 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 0L - val (workload, interferenceModel) = createTestWorkload(1.0, seed) + val workload = createTestWorkload(1.0, seed) val simulator = ComputeServiceHelper( coroutineContext, clock, computeScheduler, - seed, - interferenceModel = interferenceModel + seed ) val topology = createTopology("single") val servers = mutableListOf() @@ -192,7 +191,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(workload, servers) + simulator.run(workload, servers, interference = true) val serviceMetrics = simulator.service.getSchedulerStats() println( @@ -213,7 +212,7 @@ class CapelinIntegrationTest { { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(477068, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(485510, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } ) } @@ -231,7 +230,7 @@ class CapelinIntegrationTest { grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") - val (workload, _) = createTestWorkload(0.25, seed) + val workload = createTestWorkload(0.25, seed) val servers = mutableListOf() val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) @@ -266,7 +265,7 @@ class CapelinIntegrationTest { /** * Obtain the trace reader for the test. */ - private fun createTestWorkload(fraction: Double, seed: Long): ComputeWorkload.Resolved { + private fun createTestWorkload(fraction: Double, seed: Long): List { val source = trace("bitbrains-small").sampleByLoad(fraction) return source.resolve(workloadLoader, Random(seed)) } diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index c3332d66..e862e4d1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -87,7 +87,7 @@ class SimMachineBenchmarks { engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, null, random) + val hypervisor = SimSpaceSharedHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } @@ -110,7 +110,7 @@ class SimMachineBenchmarks { engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, null, random) + val hypervisor = SimFairShareHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } @@ -133,7 +133,7 @@ class SimMachineBenchmarks { engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, null, random) + val hypervisor = SimFairShareHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 2cabeece..77088b74 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -25,7 +25,9 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.* import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload @@ -38,13 +40,15 @@ import kotlin.math.roundToLong * Abstract implementation of the [SimHypervisor] interface. * * @param engine The [FlowEngine] to drive the simulation. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. * @param random A randomness generator for the interference calculations. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. + * @param interferenceDomain The interference domain to which the hypervisor belongs. */ public abstract class SimAbstractHypervisor( protected val engine: FlowEngine, + private val random: SplittableRandom, private val scalingGovernor: ScalingGovernor?, - private val random: SplittableRandom + private val interferenceDomain: VmInterferenceDomain ) : SimHypervisor, FlowConvergenceListener { /** * The machine on which the hypervisor runs. @@ -94,9 +98,9 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf() /* SimHypervisor */ - override fun newMachine(model: MachineModel, interferenceKey: VmInterferenceMember?): SimVirtualMachine { + override fun newMachine(model: MachineModel): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } - val vm = VirtualMachine(model, interferenceKey) + val vm = VirtualMachine(model) _vms.add(vm) return vm } @@ -164,12 +168,8 @@ public abstract class SimAbstractHypervisor( * A virtual machine running on the hypervisor. * * @param model The machine model of the virtual machine. - * @param interferenceKey The interference key of this virtual machine. */ - private inner class VirtualMachine( - model: MachineModel, - private val interferenceKey: VmInterferenceMember? = null - ) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { + private inner class VirtualMachine(model: MachineModel) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { /** * A flag to indicate that the machine is closed. */ @@ -185,7 +185,7 @@ public abstract class SimAbstractHypervisor( */ override val counters: SimHypervisorCounters get() = _counters - @JvmField val _counters = VmCountersImpl(cpus, interferenceKey) + @JvmField val _counters = VmCountersImpl(cpus, null) /** * The CPU capacity of the hypervisor in MHz. @@ -208,21 +208,27 @@ public abstract class SimAbstractHypervisor( override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { check(!isClosed) { "Machine is closed" } + val profile = meta["interference-profile"] as? VmInterferenceProfile + val interferenceMember = if (profile != null) interferenceDomain.join(profile) else null + + val counters = _counters + counters.member = interferenceMember + return super.startWorkload( object : SimWorkload { override fun onStart(ctx: SimMachineContext) { - val interferenceKey = interferenceKey try { - interferenceKey?.activate() + interferenceMember?.activate() workload.onStart(ctx) } catch (cause: Throwable) { - interferenceKey?.deactivate() + interferenceMember?.deactivate() throw cause } } override fun onStop(ctx: SimMachineContext) { - interferenceKey?.deactivate() + interferenceMember?.deactivate() + counters.member = null workload.onStop(ctx) } }, @@ -339,7 +345,7 @@ public abstract class SimAbstractHypervisor( */ private inner class VmCountersImpl( private val cpus: List, - private val key: VmInterferenceMember? + @JvmField var member: VmInterferenceMember? ) : SimHypervisorCounters { private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 @@ -388,9 +394,9 @@ public abstract class SimAbstractHypervisor( cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() // Compute the performance penalty due to flow interference - val key = key - if (key != null) { - val penalty = 1 - key.apply(random, load) + val member = member + if (member != null) { + val penalty = 1 - member.apply(random, load) val interference = (actualDelta * d * penalty).roundToLong() if (interference > 0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 4435a422..fbee46da 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.FlowEngine @@ -35,15 +36,17 @@ import java.util.SplittableRandom * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] * concurrently using weighted fair sharing. * - * @param engine The [FlowEngine] to manage the machine's resources. - * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. + * @param engine The [FlowEngine] to drive the simulation. * @param random A randomness generator for the interference calculations. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. + * @param interferenceDomain The interference domain to which the hypervisor belongs. */ public class SimFairShareHypervisor( engine: FlowEngine, + random: SplittableRandom, scalingGovernor: ScalingGovernor?, - random: SplittableRandom -) : SimAbstractHypervisor(engine, scalingGovernor, random) { + interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() +) : SimAbstractHypervisor(engine, random, scalingGovernor, interferenceDomain) { /** * The multiplexer that distributes the computing capacity. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index c7008652..81dfc43d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.flow.FlowEngine import java.util.* @@ -36,5 +37,6 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { engine: FlowEngine, random: SplittableRandom, scalingGovernor: ScalingGovernor?, - ): SimHypervisor = SimFairShareHypervisor(engine, scalingGovernor, random) + interferenceDomain: VmInterferenceDomain, + ): SimHypervisor = SimFairShareHypervisor(engine, random, scalingGovernor, interferenceDomain) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index f53d0c5d..d8e4e7cd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload @@ -66,9 +65,8 @@ public interface SimHypervisor : SimWorkload { * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. - * @param interferenceKey The key of the machine in the interference model. */ - public fun newMachine(model: MachineModel, interferenceKey: VmInterferenceMember? = null): SimVirtualMachine + public fun newMachine(model: MachineModel): SimVirtualMachine /** * Remove the specified [machine] from the hypervisor. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index 020a2a60..2c86854e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.flow.FlowEngine import java.util.SplittableRandom @@ -41,5 +42,10 @@ public interface SimHypervisorProvider { /** * Create a new [SimHypervisor] instance. */ - public fun create(engine: FlowEngine, random: SplittableRandom, scalingGovernor: ScalingGovernor? = null): SimHypervisor + public fun create( + engine: FlowEngine, + random: SplittableRandom, + scalingGovernor: ScalingGovernor? = null, + interferenceDomain: VmInterferenceDomain + ): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index 51bf4ce5..c32dd027 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.flow.mux.FlowMultiplexer @@ -32,15 +33,17 @@ import java.util.SplittableRandom /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. * - * @param engine The [FlowEngine] to manage the machine's resources. - * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. + * @param engine The [FlowEngine] to drive the simulation. * @param random A randomness generator for the interference calculations. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. + * @param interferenceDomain The interference domain to which the hypervisor belongs. */ public class SimSpaceSharedHypervisor( engine: FlowEngine, + random: SplittableRandom, scalingGovernor: ScalingGovernor?, - random: SplittableRandom -) : SimAbstractHypervisor(engine, scalingGovernor, random) { + interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() +) : SimAbstractHypervisor(engine, random, scalingGovernor, interferenceDomain) { override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this) override fun canFit(model: MachineModel): Boolean { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index 05c54528..cc303bbd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.flow.FlowEngine import java.util.* @@ -36,5 +37,6 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { engine: FlowEngine, random: SplittableRandom, scalingGovernor: ScalingGovernor?, - ): SimHypervisor = SimSpaceSharedHypervisor(engine, scalingGovernor, random) + interferenceDomain: VmInterferenceDomain, + ): SimHypervisor = SimSpaceSharedHypervisor(engine, random, scalingGovernor, interferenceDomain) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index 3b355f1e..6861823b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,16 +22,110 @@ package org.opendc.simulator.compute.kernel.interference +import java.util.ArrayDeque +import java.util.ArrayList +import java.util.WeakHashMap + /** - * The interference domain of a hypervisor. + * A domain where virtual machines may incur performance variability due to operating on the same resource and + * therefore causing interference. */ -public interface VmInterferenceDomain { +public class VmInterferenceDomain { + /** + * A cache to maintain a mapping between the active profiles in this domain. + */ + private val cache = WeakHashMap() + + /** + * The set of members active in this domain. + */ + private val activeKeys = ArrayList() + + /** + * Queue of participants that will be removed or added to the active groups. + */ + private val participants = ArrayDeque() + + /** + * Join this interference domain with the specified [profile] and return the [VmInterferenceMember] associated with + * the profile. If the member does not exist, it will be created. + */ + public fun join(profile: VmInterferenceProfile): VmInterferenceMember { + return cache.computeIfAbsent(profile) { key -> key.newMember(this) } + } + + /** + * Mark the specified [member] as active in this interference domain. + */ + internal fun activate(member: VmInterferenceMember) { + val activeKeys = activeKeys + val pos = activeKeys.binarySearch(member) + if (pos < 0) { + activeKeys.add(-pos - 1, member) + } + + computeActiveGroups(activeKeys, member) + } + /** - * Return the [VmInterferenceMember] associated with the specified [id]. - * - * @param id The identifier of the virtual machine. - * @return A [VmInterferenceMember] representing the virtual machine as part of the interference domain. `null` if - * the virtual machine does not participate in the domain. + * Mark the specified [member] as inactive in this interference domain. */ - public fun getMember(id: String): VmInterferenceMember? + internal fun deactivate(member: VmInterferenceMember) { + val activeKeys = activeKeys + activeKeys.remove(member) + computeActiveGroups(activeKeys, member) + } + + /** + * (Re-)compute the active groups. + */ + private fun computeActiveGroups(activeKeys: ArrayList, member: VmInterferenceMember) { + if (activeKeys.isEmpty()) { + return + } + + val groups = member.membership + val members = member.members + val participants = participants + + for (group in groups) { + val groupMembers = members[group] + + var i = 0 + var j = 0 + var intersection = 0 + + // Compute the intersection of the group members and the current active members + while (i < groupMembers.size && j < activeKeys.size) { + val l = groupMembers[i] + val rightEntry = activeKeys[j] + val r = rightEntry.id + + if (l < r) { + i++ + } else if (l > r) { + j++ + } else { + if (++intersection > 1) { + rightEntry.addGroup(group) + } else { + participants.add(rightEntry) + } + + i++ + j++ + } + } + + while (true) { + val participant = participants.poll() ?: break + + if (intersection <= 1) { + participant.removeGroup(group) + } else { + participant.addGroup(group) + } + } + } + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt index 04203c63..762bb568 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,16 +27,43 @@ import java.util.* /** * A participant of an interference domain. */ -public interface VmInterferenceMember { +public class VmInterferenceMember( + private val domain: VmInterferenceDomain, + private val model: VmInterferenceModel, + @JvmField internal val id: Int, + @JvmField internal val membership: IntArray, + @JvmField internal val members: Array, + private val targets: DoubleArray, + private val scores: DoubleArray +) : Comparable { + /** + * The active groups to which the key belongs. + */ + private var groups: IntArray = IntArray(2) + private var groupsSize: Int = 0 + + /** + * The number of users of the interference key. + */ + private var refCount: Int = 0 + /** * Mark this member as active in this interference domain. */ - public fun activate() + public fun activate() { + if (refCount++ <= 0) { + domain.activate(this) + } + } /** * Mark this member as inactive in this interference domain. */ - public fun deactivate() + public fun deactivate() { + if (--refCount <= 0) { + domain.deactivate(this) + } + } /** * Compute the performance score of the member in this interference domain. @@ -46,5 +73,91 @@ public interface VmInterferenceMember { * @return A score representing the performance score to be applied to the member, with 1 * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. */ - public fun apply(random: SplittableRandom, load: Double): Double + public fun apply(random: SplittableRandom, load: Double): Double { + val groupsSize = groupsSize + + if (groupsSize == 0) { + return 1.0 + } + + val groups = groups + val targets = targets + + var low = 0 + var high = groupsSize - 1 + var group = -1 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + break + } + } + + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + scores[group] + } else { + 1.0 + } + } + + /** + * Add an active group to this member. + */ + internal fun addGroup(group: Int) { + var groups = groups + val groupsSize = groupsSize + val pos = groups.binarySearch(group, toIndex = groupsSize) + + if (pos >= 0) { + return + } + + val idx = -pos - 1 + + if (groups.size == groupsSize) { + val newSize = groupsSize + (groupsSize shr 1) + groups = groups.copyOf(newSize) + this.groups = groups + } + + groups.copyInto(groups, idx + 1, idx, groupsSize) + groups[idx] = group + this.groupsSize += 1 + } + + /** + * Remove an active group from this member. + */ + internal fun removeGroup(group: Int) { + val groups = groups + val groupsSize = groupsSize + val pos = groups.binarySearch(group, toIndex = groupsSize) + + if (pos < 0) { + return + } + + groups.copyInto(groups, pos, pos + 1, groupsSize) + this.groupsSize -= 1 + } + + override fun compareTo(other: VmInterferenceMember): Int { + val cmp = model.hashCode().compareTo(other.model.hashCode()) + if (cmp != 0) { + return cmp + } + + return id.compareTo(other.id) + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index 3ea869d4..018c6e3d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -27,7 +27,7 @@ import java.util.* /** * An interference model that models the resource interference between virtual machines on a host. * - * @param targets The target load of each group. + * @param members The target load of each group. * @param scores The performance score of each group. * @param members The members belonging to each group. * @param membership The identifier of each key. @@ -42,9 +42,16 @@ public class VmInterferenceModel private constructor( private val size: Int ) { /** - * Construct a new [VmInterferenceDomain]. + * Return the [VmInterferenceProfile] associated with the specified [id]. + * + * @param id The identifier of the virtual machine. + * @return A [VmInterferenceProfile] representing the virtual machine as part of interference model or `null` if + * there is no profile for the virtual machine. */ - public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(idMapping, members, membership, targets, scores) + public fun getProfile(id: String): VmInterferenceProfile? { + val intId = idMapping[id] ?: return null + return VmInterferenceProfile(this, intId, membership[intId], members, targets, scores) + } public companion object { /** @@ -112,8 +119,8 @@ public class VmInterferenceModel private constructor( val scores = _scores val members = _members - val indices = Array(size) { it } - indices.sortWith( + val indices = IntArray(size) { it } + indices.sortedWith( Comparator { l, r -> var cmp = targets[l].compareTo(targets[r]) // Order by target load if (cmp != 0) { @@ -179,224 +186,4 @@ public class VmInterferenceModel private constructor( const val INITIAL_CAPACITY = 256 } } - - /** - * Internal implementation of [VmInterferenceDomain]. - */ - private class InterferenceDomainImpl( - private val idMapping: Map, - private val members: Array, - private val membership: Array, - private val targets: DoubleArray, - private val scores: DoubleArray, - ) : VmInterferenceDomain { - /** - * Keys registered with this domain. - */ - private val keys = HashMap() - - /** - * The set of keys active in this domain. - */ - private val activeKeys = ArrayList() - - /** - * Queue of participants that will be removed or added to the active groups. - */ - private val participants = ArrayDeque() - - override fun getMember(id: String): VmInterferenceMember? { - val intId = idMapping[id] ?: return null - return keys.computeIfAbsent(intId) { InterferenceMemberImpl(it, this, membership[it], members, targets, scores) } - } - - override fun toString(): String = "VmInterferenceDomain" - - fun join(key: InterferenceMemberImpl) { - val activeKeys = activeKeys - val pos = activeKeys.binarySearch(key) - if (pos < 0) { - activeKeys.add(-pos - 1, key) - } - - computeActiveGroups(activeKeys, key) - } - - fun leave(key: InterferenceMemberImpl) { - val activeKeys = activeKeys - activeKeys.remove(key) - computeActiveGroups(activeKeys, key) - } - - /** - * (Re-)compute the active groups. - */ - private fun computeActiveGroups(activeKeys: ArrayList, key: InterferenceMemberImpl) { - if (activeKeys.isEmpty()) { - return - } - - val groups = key.membership - val members = members - val participants = participants - - for (group in groups) { - val groupMembers = members[group] - - var i = 0 - var j = 0 - var intersection = 0 - - // Compute the intersection of the group members and the current active members - while (i < groupMembers.size && j < activeKeys.size) { - val l = groupMembers[i] - val rightEntry = activeKeys[j] - val r = rightEntry.id - - if (l < r) { - i++ - } else if (l > r) { - j++ - } else { - if (++intersection > 1) { - rightEntry.addGroup(group) - } else { - participants.add(rightEntry) - } - - i++ - j++ - } - } - - while (true) { - val participant = participants.poll() ?: break - - if (intersection <= 1) { - participant.removeGroup(group) - } else { - participant.addGroup(group) - } - } - } - } - } - - /** - * An interference key. - * - * @param id The identifier of the member. - */ - private class InterferenceMemberImpl( - @JvmField val id: Int, - private val domain: InterferenceDomainImpl, - @JvmField val membership: IntArray, - private val members: Array, - private val targets: DoubleArray, - private val scores: DoubleArray - ) : VmInterferenceMember, Comparable { - /** - * The active groups to which the key belongs. - */ - private var groups: IntArray = IntArray(2) - private var groupsSize: Int = 0 - - /** - * The number of users of the interference key. - */ - private var refCount: Int = 0 - - override fun activate() { - if (refCount++ <= 0) { - domain.join(this) - } - } - - override fun deactivate() { - if (--refCount <= 0) { - domain.leave(this) - } - } - - override fun apply(random: SplittableRandom, load: Double): Double { - val groupsSize = groupsSize - - if (groupsSize == 0) { - return 1.0 - } - - val groups = groups - val targets = targets - - var low = 0 - var high = groupsSize - 1 - var group = -1 - - // Perform binary search over the groups based on target load - while (low <= high) { - val mid = low + high ushr 1 - val midGroup = groups[mid] - val target = targets[midGroup] - - if (target < load) { - low = mid + 1 - group = midGroup - } else if (target > load) { - high = mid - 1 - } else { - group = midGroup - break - } - } - - return if (group >= 0 && random.nextInt(members[group].size) == 0) { - scores[group] - } else { - 1.0 - } - } - - /** - * Add an active group to this member. - */ - fun addGroup(group: Int) { - var groups = groups - val groupsSize = groupsSize - val pos = groups.binarySearch(group, toIndex = groupsSize) - - if (pos >= 0) { - return - } - - val idx = -pos - 1 - - if (groups.size == groupsSize) { - val newSize = groupsSize + (groupsSize shr 1) - groups = groups.copyOf(newSize) - this.groups = groups - } - - groups.copyInto(groups, idx + 1, idx, groupsSize) - groups[idx] = group - this.groupsSize += 1 - } - - /** - * Remove an active group from this member. - */ - fun removeGroup(group: Int) { - val groups = groups - val groupsSize = groupsSize - val pos = groups.binarySearch(group, toIndex = groupsSize) - - if (pos < 0) { - return - } - - groups.copyInto(groups, pos, pos + 1, groupsSize) - this.groupsSize -= 1 - } - - override fun compareTo(other: InterferenceMemberImpl): Int = id.compareTo(other.id) - } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt new file mode 100644 index 00000000..004dbd07 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +/** + * A profile of a particular virtual machine describing its interference pattern with other virtual machines. + * + * @param model The model to which this profile belongs. + * @property id The identifier of the profile inside the model. + * @property membership The membership of the profile in the groups. + * @param members The members in the model. + * @param targets The targets in the model. + * @param scores The scores in the model. + */ +public class VmInterferenceProfile internal constructor( + private val model: VmInterferenceModel, + private val id: Int, + private val membership: IntArray, + private val members: Array, + private val targets: DoubleArray, + private val scores: DoubleArray +) { + /** + * Create a new [VmInterferenceMember] based on this profile for the specified [domain]. + */ + internal fun newMember(domain: VmInterferenceDomain): VmInterferenceMember { + return VmInterferenceMember(domain, model, id, membership, members, targets, scores) + } + + override fun toString(): String = "VmInterferenceProfile[id=$id]" +} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 23d832e8..d401f8b5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -79,7 +79,7 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, PerformanceScalingGovernor(), random) + val hypervisor = SimFairShareHypervisor(platform, random, PerformanceScalingGovernor()) launch { machine.runWorkload(hypervisor) @@ -131,7 +131,7 @@ internal class SimFairShareHypervisorTest { platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, null, random) + val hypervisor = SimFairShareHypervisor(platform, random, null) launch { machine.runWorkload(hypervisor) @@ -171,7 +171,7 @@ internal class SimFairShareHypervisorTest { val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, null, random) + val hypervisor = SimFairShareHypervisor(platform, random, null) assertDoesNotThrow { launch { @@ -195,14 +195,13 @@ internal class SimFairShareHypervisorTest { .addGroup(targetLoad = 0.0, score = 0.6, members = setOf("a", "c")) .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) .build() - val interferenceDomain = interferenceModel.newDomain() - val platform = FlowEngine(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, null, random) + val hypervisor = SimFairShareHypervisor(engine, random, null) val duration = 5 * 60L val workloadA = @@ -230,12 +229,12 @@ internal class SimFairShareHypervisorTest { coroutineScope { launch { - val vm = hypervisor.newMachine(model, interferenceDomain.getMember("a")) - vm.runWorkload(workloadA) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA, meta = mapOf("interference-model" to interferenceModel.getProfile("a")!!)) hypervisor.removeMachine(vm) } - val vm = hypervisor.newMachine(model, interferenceDomain.getMember("b")) - vm.runWorkload(workloadB) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadB, meta = mapOf("interference-model" to interferenceModel.getProfile("b")!!)) hypervisor.removeMachine(vm) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 9471f548..9b31acf4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -77,7 +77,7 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, null, random) + val hypervisor = SimSpaceSharedHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } val vm = hypervisor.newMachine(machineModel) @@ -100,7 +100,7 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, null, random) + val hypervisor = SimSpaceSharedHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } yield() @@ -125,7 +125,7 @@ internal class SimSpaceSharedHypervisorTest { engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, null, random) + val hypervisor = SimSpaceSharedHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } yield() @@ -147,7 +147,7 @@ internal class SimSpaceSharedHypervisorTest { engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, null, random) + val hypervisor = SimSpaceSharedHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } yield() @@ -175,7 +175,7 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, null, random) + val hypervisor = SimSpaceSharedHypervisor(engine, random, null) launch { machine.runWorkload(hypervisor) } yield() @@ -200,7 +200,7 @@ internal class SimSpaceSharedHypervisorTest { interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(interpreter, null, random) + val hypervisor = SimSpaceSharedHypervisor(interpreter, random, null) launch { machine.runWorkload(hypervisor) } yield() diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index d5dbed1c..b7e550ef 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -208,7 +208,7 @@ public class OpenDCRunner( val phenomena = scenario.phenomena val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) val workload = trace(workloadName).sampleByLoad(workloadFraction) - val (vms, interferenceModel) = workload.resolve(workloadLoader, seeder) + val vms = workload.resolve(workloadLoader, seeder) val failureModel = if (phenomena.failures) @@ -221,8 +221,7 @@ public class OpenDCRunner( clock, computeScheduler, seed = 0L, - failureModel, - interferenceModel.takeIf { phenomena.interference } + failureModel ) val servers = mutableListOf() val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) @@ -231,7 +230,7 @@ public class OpenDCRunner( // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, servers) + simulator.run(vms, servers, interference = phenomena.interference) val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { -- cgit v1.2.3 From 92787292269783701cb7f1082f0262e7e2851df9 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 22 Sep 2022 22:28:44 +0200 Subject: refactor(sim/compute): Simplify SimHypervisor class This change simplifies the SimHypervisor class into a single implementation. Previously, it was implemented as an abstract class with multiple implementations for each multiplexer type. We now pass the multiplexer type as parameter to the SimHypervisor constructor. --- .../org/opendc/compute/simulator/SimHostTest.kt | 7 +- .../compute/workload/ComputeServiceHelper.kt | 4 +- .../opendc/compute/workload/topology/HostSpec.kt | 7 +- .../simulator/compute/SimMachineBenchmarks.kt | 25 +- .../compute/kernel/SimAbstractHypervisor.kt | 415 --------------------- .../compute/kernel/SimFairShareHypervisor.kt | 56 --- .../kernel/SimFairShareHypervisorProvider.kt | 42 --- .../simulator/compute/kernel/SimHypervisor.kt | 383 ++++++++++++++++++- .../compute/kernel/SimHypervisorProvider.kt | 51 --- .../compute/kernel/SimSpaceSharedHypervisor.kt | 52 --- .../kernel/SimSpaceSharedHypervisorProvider.kt | 42 --- .../compute/kernel/SimFairShareHypervisorTest.kt | 31 +- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 37 +- .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 10 + .../simulator/flow/mux/FlowMultiplexerFactory.kt | 60 +++ .../flow/mux/ForwardingFlowMultiplexer.kt | 6 + .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 5 + .../opendc/workflow/service/WorkflowServiceTest.kt | 4 +- 18 files changed, 495 insertions(+), 742 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 879f15b2..a7993291 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -31,7 +31,7 @@ import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.SimFairShareHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.time.Instant import java.util.* import kotlin.coroutines.resume @@ -71,7 +72,7 @@ internal class SimHostTest { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(engine, SplittableRandom(1), null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", @@ -155,7 +156,7 @@ internal class SimHostTest { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(engine, SplittableRandom(1), null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index ad132efe..3be0217c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -32,7 +32,7 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.flow.FlowEngine import java.time.Clock @@ -175,7 +175,7 @@ public class ComputeServiceHelper( */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver) - val hypervisor = spec.hypervisor.create(engine, random, interferenceDomain = VmInterferenceDomain()) + val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, random) val host = SimHost( spec.uid, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt index f3dc1e9e..87530f5a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt @@ -22,10 +22,9 @@ package org.opendc.compute.workload.topology -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.SimHypervisorProvider import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* /** @@ -36,7 +35,7 @@ import java.util.* * @param meta The metadata of the host. * @param model The physical model of the machine. * @param powerDriver The [PowerDriver] to model the power consumption of the machine. - * @param hypervisor The hypervisor implementation to use. + * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host. */ public data class HostSpec( val uid: UUID, @@ -44,5 +43,5 @@ public data class HostSpec( val meta: Map, val model: MachineModel, val powerDriver: PowerDriver, - val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider() + val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() ) diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index e862e4d1..797d424e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -24,8 +24,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import org.opendc.simulator.compute.kernel.SimFairShareHypervisor -import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -36,6 +35,7 @@ import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import org.openjdk.jmh.annotations.* import java.util.SplittableRandom import java.util.concurrent.ThreadLocalRandom @@ -83,11 +83,8 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } @@ -106,11 +103,8 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } @@ -129,11 +123,8 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt deleted file mode 100644 index 77088b74..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember -import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.mux.FlowMultiplexer -import java.util.SplittableRandom -import kotlin.math.roundToLong - -/** - * Abstract implementation of the [SimHypervisor] interface. - * - * @param engine The [FlowEngine] to drive the simulation. - * @param random A randomness generator for the interference calculations. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. - * @param interferenceDomain The interference domain to which the hypervisor belongs. - */ -public abstract class SimAbstractHypervisor( - protected val engine: FlowEngine, - private val random: SplittableRandom, - private val scalingGovernor: ScalingGovernor?, - private val interferenceDomain: VmInterferenceDomain -) : SimHypervisor, FlowConvergenceListener { - /** - * The machine on which the hypervisor runs. - */ - protected lateinit var context: SimMachineContext - - /** - * The resource switch to use. - */ - protected abstract val mux: FlowMultiplexer - - /** - * The virtual machines running on this hypervisor. - */ - private val _vms = mutableSetOf() - override val vms: Set - get() = _vms - - /** - * The resource counters associated with the hypervisor. - */ - public override val counters: SimHypervisorCounters - get() = _counters - private val _counters = CountersImpl(this) - - /** - * The CPU capacity of the hypervisor in MHz. - */ - override val cpuCapacity: Double - get() = mux.capacity - - /** - * The CPU demand of the hypervisor in MHz. - */ - override val cpuDemand: Double - get() = mux.demand - - /** - * The CPU usage of the hypervisor in MHz. - */ - override val cpuUsage: Double - get() = mux.rate - - /** - * The scaling governors attached to the physical CPUs backing this hypervisor. - */ - private val governors = mutableListOf() - - /* SimHypervisor */ - override fun newMachine(model: MachineModel): SimVirtualMachine { - require(canFit(model)) { "Machine does not fit" } - val vm = VirtualMachine(model) - _vms.add(vm) - return vm - } - - override fun removeMachine(machine: SimVirtualMachine) { - if (_vms.remove(machine)) { - // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. - (machine as VirtualMachine).close() - } - } - - /* SimWorkload */ - override fun onStart(ctx: SimMachineContext) { - context = ctx - - _cpuCount = ctx.cpus.size - _cpuCapacity = ctx.cpus.sumOf { it.model.frequency } - _counters.d = _cpuCount / _cpuCapacity * 1000L - - // Clear the existing outputs of the multiplexer - mux.clearOutputs() - - for (cpu in ctx.cpus) { - val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) - if (governor != null) { - governors.add(governor) - governor.onStart() - } - - cpu.startConsumer(mux.newOutput()) - } - } - - override fun onStop(ctx: SimMachineContext) {} - - private var _cpuCount = 0 - private var _cpuCapacity = 0.0 - private var _lastConverge = engine.clock.millis() - - /* FlowConvergenceListener */ - override fun onConverge(now: Long) { - val lastConverge = _lastConverge - _lastConverge = now - val delta = now - lastConverge - - if (delta > 0) { - _counters.record() - - val mux = mux - val load = mux.rate / mux.capacity.coerceAtLeast(1.0) - val random = random - - for (vm in _vms) { - vm._counters.record(random, load) - } - } - - val load = cpuDemand / cpuCapacity - for (governor in governors) { - governor.onLimit(load) - } - } - - /** - * A virtual machine running on the hypervisor. - * - * @param model The machine model of the virtual machine. - */ - private inner class VirtualMachine(model: MachineModel) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { - /** - * A flag to indicate that the machine is closed. - */ - private var isClosed = false - - /** - * The vCPUs of the machine. - */ - override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) } - - /** - * The resource counters associated with the hypervisor. - */ - override val counters: SimHypervisorCounters - get() = _counters - @JvmField val _counters = VmCountersImpl(cpus, null) - - /** - * The CPU capacity of the hypervisor in MHz. - */ - override val cpuCapacity: Double - get() = cpus.sumOf(FlowConsumer::capacity) - - /** - * The CPU demand of the hypervisor in MHz. - */ - override val cpuDemand: Double - get() = cpus.sumOf(FlowConsumer::demand) - - /** - * The CPU usage of the hypervisor in MHz. - */ - override val cpuUsage: Double - get() = cpus.sumOf(FlowConsumer::rate) - - override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { - check(!isClosed) { "Machine is closed" } - - val profile = meta["interference-profile"] as? VmInterferenceProfile - val interferenceMember = if (profile != null) interferenceDomain.join(profile) else null - - val counters = _counters - counters.member = interferenceMember - - return super.startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - interferenceMember?.activate() - workload.onStart(ctx) - } catch (cause: Throwable) { - interferenceMember?.deactivate() - throw cause - } - } - - override fun onStop(ctx: SimMachineContext) { - interferenceMember?.deactivate() - counters.member = null - workload.onStop(ctx) - } - }, - meta - ) - } - - override fun close() { - if (isClosed) { - return - } - - isClosed = true - cancel() - - for (cpu in cpus) { - cpu.close() - } - } - } - - /** - * A [SimProcessingUnit] of a virtual machine. - */ - private class VCpu( - private val switch: FlowMultiplexer, - private val source: FlowConsumer, - override val model: ProcessingUnit - ) : SimProcessingUnit, FlowConsumer by source { - override var capacity: Double - get() = source.capacity - set(_) = TODO("Capacity changes on vCPU not supported") - - override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" - - /** - * Close the CPU - */ - fun close() { - switch.removeInput(source) - } - - fun flush() { - switch.flushCounters(source) - } - } - - /** - * A [ScalingPolicy] for a physical CPU of the hypervisor. - */ - private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy { - override var target: Double - get() = cpu.capacity - set(value) { - cpu.capacity = value - } - - override val max: Double = cpu.model.frequency - - override val min: Double = 0.0 - } - - /** - * Implementation of [SimHypervisorCounters]. - */ - private class CountersImpl(private val hv: SimAbstractHypervisor) : SimHypervisorCounters { - @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity - - override val cpuActiveTime: Long - get() = _cpuTime[0] - override val cpuIdleTime: Long - get() = _cpuTime[1] - override val cpuStealTime: Long - get() = _cpuTime[2] - override val cpuLostTime: Long - get() = _cpuTime[3] - - val _cpuTime = LongArray(4) - private val _previous = DoubleArray(3) - - /** - * Record the CPU time of the hypervisor. - */ - fun record() { - val cpuTime = _cpuTime - val previous = _previous - val counters = hv.mux.counters - - val demand = counters.demand - val actual = counters.actual - val remaining = counters.remaining - - val demandDelta = demand - previous[0] - val actualDelta = actual - previous[1] - val remainingDelta = remaining - previous[2] - - previous[0] = demand - previous[1] = actual - previous[2] = remaining - - cpuTime[0] += (actualDelta * d).roundToLong() - cpuTime[1] += (remainingDelta * d).roundToLong() - cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() - } - - override fun flush() { - hv.mux.flushCounters() - record() - } - } - - /** - * A [SimHypervisorCounters] implementation for a virtual machine. - */ - private inner class VmCountersImpl( - private val cpus: List, - @JvmField var member: VmInterferenceMember? - ) : SimHypervisorCounters { - private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 - - override val cpuActiveTime: Long - get() = _cpuTime[0] - override val cpuIdleTime: Long - get() = _cpuTime[1] - override val cpuStealTime: Long - get() = _cpuTime[2] - override val cpuLostTime: Long - get() = _cpuTime[3] - - private val _cpuTime = LongArray(4) - private val _previous = DoubleArray(3) - - /** - * Record the CPU time of the hypervisor. - */ - fun record(random: SplittableRandom, load: Double) { - val cpuTime = _cpuTime - val previous = _previous - - var demand = 0.0 - var actual = 0.0 - var remaining = 0.0 - - for (cpu in cpus) { - val counters = cpu.counters - - actual += counters.actual - demand += counters.demand - remaining += counters.remaining - } - - val demandDelta = demand - previous[0] - val actualDelta = actual - previous[1] - val remainingDelta = remaining - previous[2] - - previous[0] = demand - previous[1] = actual - previous[2] = remaining - - val d = d - cpuTime[0] += (actualDelta * d).roundToLong() - cpuTime[1] += (remainingDelta * d).roundToLong() - cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() - - // Compute the performance penalty due to flow interference - val member = member - if (member != null) { - val penalty = 1 - member.apply(random, load) - val interference = (actualDelta * d * penalty).roundToLong() - - if (interference > 0) { - cpuTime[3] += interference - _counters._cpuTime[3] += interference - } - } - } - - override fun flush() { - for (cpu in cpus) { - cpu.flush() - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt deleted file mode 100644 index fbee46da..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.mux.FlowMultiplexer -import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer -import java.util.SplittableRandom - -/** - * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] - * concurrently using weighted fair sharing. - * - * @param engine The [FlowEngine] to drive the simulation. - * @param random A randomness generator for the interference calculations. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. - * @param interferenceDomain The interference domain to which the hypervisor belongs. - */ -public class SimFairShareHypervisor( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() -) : SimAbstractHypervisor(engine, random, scalingGovernor, interferenceDomain) { - /** - * The multiplexer that distributes the computing capacity. - */ - override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this) - - override fun canFit(model: MachineModel): Boolean = true -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt deleted file mode 100644 index 81dfc43d..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowEngine -import java.util.* - -/** - * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. - */ -public class SimFairShareHypervisorProvider : SimHypervisorProvider { - override val id: String = "fair-share" - - override fun create( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain, - ): SimHypervisor = SimFairShareHypervisor(engine, random, scalingGovernor, interferenceDomain) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index d8e4e7cd..7594bf4d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -22,56 +22,415 @@ package org.opendc.simulator.compute.kernel -import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.* +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory +import java.util.SplittableRandom +import kotlin.math.roundToLong /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload * to another [SimMachine]. + * + * @param engine The [FlowEngine] to drive the simulation. + * @param muxFactory The factor for the [FlowMultiplexer] to multiplex the workloads. + * @param random A randomness generator for the interference calculations. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. + * @param interferenceDomain The interference domain to which the hypervisor belongs. */ -public interface SimHypervisor : SimWorkload { +public class SimHypervisor( + private val engine: FlowEngine, + muxFactory: FlowMultiplexerFactory, + private val random: SplittableRandom, + private val scalingGovernor: ScalingGovernor? = null, + private val interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() +) : SimWorkload, FlowConvergenceListener { + /** + * The [FlowMultiplexer] to multiplex the virtual machines. + */ + private val mux = muxFactory.newMultiplexer(engine, this) + /** - * The machines running on the hypervisor. + * The virtual machines running on this hypervisor. */ + private val _vms = mutableSetOf() public val vms: Set + get() = _vms /** * The resource counters associated with the hypervisor. */ public val counters: SimHypervisorCounters + get() = _counters + private val _counters = CountersImpl(this) /** - * The CPU usage of the hypervisor in MHz. + * The CPU capacity of the hypervisor in MHz. */ - public val cpuUsage: Double + public val cpuCapacity: Double + get() = mux.capacity /** - * The CPU usage of the hypervisor in MHz. + * The CPU demand of the hypervisor in MHz. */ public val cpuDemand: Double + get() = mux.demand /** - * The CPU capacity of the hypervisor in MHz. + * The CPU usage of the hypervisor in MHz. */ - public val cpuCapacity: Double + public val cpuUsage: Double + get() = mux.rate /** - * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. + * The machine on which the hypervisor runs. + */ + private lateinit var context: SimMachineContext + + /** + * The scaling governors attached to the physical CPUs backing this hypervisor. */ - public fun canFit(model: MachineModel): Boolean + private val governors = mutableListOf() + /* SimHypervisor */ /** * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. */ - public fun newMachine(model: MachineModel): SimVirtualMachine + public fun newMachine(model: MachineModel): SimVirtualMachine { + require(canFit(model)) { "Machine does not fit" } + val vm = VirtualMachine(model) + _vms.add(vm) + return vm + } /** * Remove the specified [machine] from the hypervisor. * * @param machine The machine to remove. */ - public fun removeMachine(machine: SimVirtualMachine) + public fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + + /** + * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. + */ + public fun canFit(model: MachineModel): Boolean { + return (mux.maxInputs - mux.inputs.size) >= model.cpus.size + } + + /* SimWorkload */ + override fun onStart(ctx: SimMachineContext) { + context = ctx + + _cpuCount = ctx.cpus.size + _cpuCapacity = ctx.cpus.sumOf { it.model.frequency } + _counters.d = _cpuCount / _cpuCapacity * 1000L + + // Clear the existing outputs of the multiplexer + mux.clearOutputs() + + for (cpu in ctx.cpus) { + val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) + if (governor != null) { + governors.add(governor) + governor.onStart() + } + + cpu.startConsumer(mux.newOutput()) + } + } + + override fun onStop(ctx: SimMachineContext) {} + + private var _cpuCount = 0 + private var _cpuCapacity = 0.0 + private var _lastConverge = engine.clock.millis() + + /* FlowConvergenceListener */ + override fun onConverge(now: Long) { + val lastConverge = _lastConverge + _lastConverge = now + val delta = now - lastConverge + + if (delta > 0) { + _counters.record() + + val mux = mux + val load = mux.rate / mux.capacity.coerceAtLeast(1.0) + val random = random + + for (vm in _vms) { + vm._counters.record(random, load) + } + } + + val load = cpuDemand / cpuCapacity + for (governor in governors) { + governor.onLimit(load) + } + } + + /** + * A virtual machine running on the hypervisor. + * + * @param model The machine model of the virtual machine. + */ + private inner class VirtualMachine(model: MachineModel) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + + /** + * The vCPUs of the machine. + */ + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) } + + /** + * The resource counters associated with the hypervisor. + */ + override val counters: SimHypervisorCounters + get() = _counters + @JvmField val _counters = VmCountersImpl(cpus, null) + + /** + * The CPU capacity of the hypervisor in MHz. + */ + override val cpuCapacity: Double + get() = cpus.sumOf(FlowConsumer::capacity) + + /** + * The CPU demand of the hypervisor in MHz. + */ + override val cpuDemand: Double + get() = cpus.sumOf(FlowConsumer::demand) + + /** + * The CPU usage of the hypervisor in MHz. + */ + override val cpuUsage: Double + get() = cpus.sumOf(FlowConsumer::rate) + + override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + val profile = meta["interference-profile"] as? VmInterferenceProfile + val interferenceMember = if (profile != null) interferenceDomain.join(profile) else null + + val counters = _counters + counters.member = interferenceMember + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + interferenceMember?.activate() + workload.onStart(ctx) + } catch (cause: Throwable) { + interferenceMember?.deactivate() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + interferenceMember?.deactivate() + counters.member = null + workload.onStop(ctx) + } + }, + meta + ) + } + + override fun close() { + if (isClosed) { + return + } + + isClosed = true + cancel() + + for (cpu in cpus) { + cpu.close() + } + } + } + + /** + * A [SimProcessingUnit] of a virtual machine. + */ + private class VCpu( + private val switch: FlowMultiplexer, + private val source: FlowConsumer, + override val model: ProcessingUnit + ) : SimProcessingUnit, FlowConsumer by source { + override var capacity: Double + get() = source.capacity + set(_) = TODO("Capacity changes on vCPU not supported") + + override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" + + /** + * Close the CPU + */ + fun close() { + switch.removeInput(source) + } + + fun flush() { + switch.flushCounters(source) + } + } + + /** + * A [ScalingPolicy] for a physical CPU of the hypervisor. + */ + private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy { + override var target: Double + get() = cpu.capacity + set(value) { + cpu.capacity = value + } + + override val max: Double = cpu.model.frequency + + override val min: Double = 0.0 + } + + /** + * Implementation of [SimHypervisorCounters]. + */ + private class CountersImpl(private val hv: SimHypervisor) : SimHypervisorCounters { + @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val cpuTime = _cpuTime + val previous = _previous + val counters = hv.mux.counters + + val demand = counters.demand + val actual = counters.actual + val remaining = counters.remaining + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + } + + override fun flush() { + hv.mux.flushCounters() + record() + } + } + + /** + * A [SimHypervisorCounters] implementation for a virtual machine. + */ + private inner class VmCountersImpl( + private val cpus: List, + @JvmField var member: VmInterferenceMember? + ) : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + private val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record(random: SplittableRandom, load: Double) { + val cpuTime = _cpuTime + val previous = _previous + + var demand = 0.0 + var actual = 0.0 + var remaining = 0.0 + + for (cpu in cpus) { + val counters = cpu.counters + + actual += counters.actual + demand += counters.demand + remaining += counters.remaining + } + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + val d = d + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + + // Compute the performance penalty due to flow interference + val member = member + if (member != null) { + val penalty = 1 - member.apply(random, load) + val interference = (actualDelta * d * penalty).roundToLong() + + if (interference > 0) { + cpuTime[3] += interference + _counters._cpuTime[3] += interference + } + } + } + + override fun flush() { + for (cpu in cpus) { + cpu.flush() + } + } + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt deleted file mode 100644 index 2c86854e..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowEngine -import java.util.SplittableRandom - -/** - * A service provider interface for constructing a [SimHypervisor]. - */ -public interface SimHypervisorProvider { - /** - * A unique identifier for this hypervisor implementation. - * - * Each hypervisor must provide a unique ID, so that they can be selected by the user. - * When in doubt, you may use the fully qualified name of your custom [SimHypervisor] implementation class. - */ - public val id: String - - /** - * Create a new [SimHypervisor] instance. - */ - public fun create( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor? = null, - interferenceDomain: VmInterferenceDomain - ): SimHypervisor -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt deleted file mode 100644 index c32dd027..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.mux.FlowMultiplexer -import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer -import java.util.SplittableRandom - -/** - * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. - * - * @param engine The [FlowEngine] to drive the simulation. - * @param random A randomness generator for the interference calculations. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. - * @param interferenceDomain The interference domain to which the hypervisor belongs. - */ -public class SimSpaceSharedHypervisor( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() -) : SimAbstractHypervisor(engine, random, scalingGovernor, interferenceDomain) { - override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this) - - override fun canFit(model: MachineModel): Boolean { - return mux.outputs.size - mux.inputs.size >= model.cpus.size - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt deleted file mode 100644 index cc303bbd..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowEngine -import java.util.* - -/** - * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. - */ -public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { - override val id: String = "space-shared" - - override fun create( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain, - ): SimHypervisor = SimSpaceSharedHypervisor(engine, random, scalingGovernor, interferenceDomain) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index d401f8b5..ddf8cf14 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* /** @@ -76,10 +77,9 @@ internal class SimFairShareHypervisorTest { ), ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, random, PerformanceScalingGovernor()) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), PerformanceScalingGovernor()) launch { machine.runWorkload(hypervisor) @@ -126,12 +126,9 @@ internal class SimFairShareHypervisorTest { ) ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, random, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) @@ -168,10 +165,9 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, random, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) assertDoesNotThrow { launch { @@ -197,11 +193,8 @@ internal class SimFairShareHypervisorTest { .build() val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, model, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val duration = 5 * 60L val workloadA = diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 9b31acf4..df6755f1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute.kernel -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* @@ -40,12 +39,12 @@ import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* /** - * A test suite for the [SimSpaceSharedHypervisor]. + * A test suite for a space-shared [SimHypervisor]. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimSpaceSharedHypervisorTest { private lateinit var machineModel: MachineModel @@ -76,8 +75,7 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } val vm = hypervisor.newMachine(machineModel) @@ -99,8 +97,7 @@ internal class SimSpaceSharedHypervisorTest { val workload = SimRuntimeWorkload(duration) val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -121,11 +118,8 @@ internal class SimSpaceSharedHypervisorTest { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -143,11 +137,8 @@ internal class SimSpaceSharedHypervisorTest { fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -174,8 +165,7 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -195,12 +185,9 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { - val interpreter = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(interpreter, random, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 3b4583e2..8752c559 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -30,6 +30,16 @@ import org.opendc.simulator.flow.FlowSource * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. */ public interface FlowMultiplexer { + /** + * The maximum number of inputs supported by the multiplexer. + */ + public val maxInputs: Int + + /** + * The maximum number of outputs supported by the multiplexer. + */ + public val maxOutputs: Int + /** * The inputs of the multiplexer that can be used to consume sources. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt new file mode 100644 index 00000000..a863e3ad --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.FlowConvergenceListener +import org.opendc.simulator.flow.FlowEngine + +/** + * Factory interface for a [FlowMultiplexer] implementation. + */ +public fun interface FlowMultiplexerFactory { + /** + * Construct a new [FlowMultiplexer] using the specified [engine] and [listener]. + */ + public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer + + public companion object { + /** + * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer]. + */ + private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) } + + /** + * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer]. + */ + private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) } + + /** + * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances. + */ + @JvmStatic + public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY + + /** + * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances. + */ + @JvmStatic + public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index c18b2701..c50e9bbc 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -37,6 +37,12 @@ public class ForwardingFlowMultiplexer( private val engine: FlowEngine, private val listener: FlowConvergenceListener? = null ) : FlowMultiplexer, FlowConvergenceListener { + + override val maxInputs: Int + get() = _outputs.size + + override val maxOutputs: Int = Int.MAX_VALUE + override val inputs: Set get() = _inputs private val _inputs = mutableSetOf() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7605f67d..f2a4c1a4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -37,6 +37,11 @@ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, parent: FlowConvergenceListener? = null ) : FlowMultiplexer { + + override val maxInputs: Int = Int.MAX_VALUE + + override val maxOutputs: Int = Int.MAX_VALUE + /** * The inputs of the multiplexer. */ diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index f6fa2134..0fb8b67c 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -33,7 +33,6 @@ import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.topology.HostSpec -import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -41,6 +40,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import org.opendc.trace.Trace import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy @@ -131,7 +131,7 @@ internal class WorkflowServiceTest { emptyMap(), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)), - SimSpaceSharedHypervisorProvider() + FlowMultiplexerFactory.forwardingMultiplexer() ) } } -- cgit v1.2.3 From d97356cf696dedb6c26fc42d9d7c44a977264dcd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 22 Sep 2022 22:39:33 +0200 Subject: refactor(compute): Pass failure model during workload evaluation This change updates the `ComputeServiceHelper` class to provide the failure model via a parameter to the `run` method instead of constructor parameter. This separates the construction of the topology from the simulation of the workload. --- .../main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt | 4 ++-- .../src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt | 3 +-- .../kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt | 5 ++--- .../src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt | 3 +-- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 3be0217c..4c07b785 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -47,7 +47,6 @@ import kotlin.math.max * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. * @param scheduler [ComputeScheduler] implementation to use for the service. - * @param failureModel A failure model to use for injecting failures. * @param schedulingQuantum The scheduling quantum of the scheduler. */ public class ComputeServiceHelper( @@ -55,7 +54,6 @@ public class ComputeServiceHelper( private val clock: Clock, scheduler: ComputeScheduler, seed: Long, - private val failureModel: FailureModel? = null, schedulingQuantum: Duration = Duration.ofMinutes(5) ) : AutoCloseable { /** @@ -89,12 +87,14 @@ public class ComputeServiceHelper( * @param trace The trace to simulate. * @param servers A list to which the created servers is added. * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). + * @param failureModel A failure model to use for injecting failures. * @param interference A flag to indicate that VM interference needs to be enabled. */ public suspend fun run( trace: List, servers: MutableList? = null, submitImmediately: Boolean = false, + failureModel: FailureModel? = null, interference: Boolean = false, ) { val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 7be09ff5..98702b4c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -74,7 +74,6 @@ public class CapelinRunner( clock, computeScheduler, seed, - failureModel ) val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) @@ -104,7 +103,7 @@ public class CapelinRunner( runner.apply(topology, optimize = true) // Run the workload trace - runner.run(vms, servers, interference = operationalPhenomena.hasInterference) + runner.run(vms, servers, failureModel = failureModel, interference = operationalPhenomena.hasInterference) // Stop the metric collection exporter?.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index af846dd6..bf8c2758 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -226,8 +226,7 @@ class CapelinIntegrationTest { coroutineContext, clock, computeScheduler, - seed, - grid5000(Duration.ofDays(7)) + seed ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) @@ -236,7 +235,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(workload, servers) + simulator.run(workload, servers, failureModel = grid5000(Duration.ofDays(7))) val serviceMetrics = simulator.service.getSchedulerStats() println( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index b7e550ef..570920f3 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -221,7 +221,6 @@ public class OpenDCRunner( clock, computeScheduler, seed = 0L, - failureModel ) val servers = mutableListOf() val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) @@ -230,7 +229,7 @@ public class OpenDCRunner( // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, servers, interference = phenomena.interference) + simulator.run(vms, servers, failureModel = failureModel, interference = phenomena.interference) val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { -- cgit v1.2.3 From 3d5eb562227dcad5a8a60f31b96e6d68f7774fb2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 23 Sep 2022 12:27:09 +0200 Subject: refactor(compute): Provide access to instances in compute service This change updates the interface of `ComputeService` to provide access to the instances (servers) that have been registered with the compute service. This allows metric collectors to query the metrics of the servers that are currently running. --- .../org/opendc/compute/service/ComputeService.kt | 5 +++ .../compute/service/internal/ComputeServiceImpl.kt | 43 +++++++++++++--------- .../compute/workload/ComputeServiceHelper.kt | 13 +------ .../workload/telemetry/ComputeMetricReader.kt | 9 +++-- .../opendc/experiments/capelin/CapelinRunner.kt | 5 +-- .../experiments/capelin/CapelinIntegrationTest.kt | 22 ++++------- .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 6 +-- 7 files changed, 49 insertions(+), 54 deletions(-) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index c0b70268..28ef7c40 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -36,6 +36,11 @@ import kotlin.coroutines.CoroutineContext * The [ComputeService] hosts the API implementation of the OpenDC Compute service. */ public interface ComputeService : AutoCloseable { + /** + * The servers that are registered with the "compute" service. + */ + public val servers: List + /** * The hosts that are registered with the "compute" service. */ diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 52ee780b..519cf6c6 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -91,17 +91,18 @@ internal class ComputeServiceImpl( /** * The registered flavors for this compute service. */ - internal val flavors = mutableMapOf() + internal val flavorById = mutableMapOf() /** * The registered images for this compute service. */ - internal val images = mutableMapOf() + internal val imageById = mutableMapOf() /** * The registered servers for this compute service. */ - private val servers = mutableMapOf() + private val serverById = mutableMapOf() + override val servers: MutableList = mutableListOf() private var maxCores = 0 private var maxMemory = 0L @@ -127,13 +128,13 @@ internal class ComputeServiceImpl( override suspend fun queryFlavors(): List { check(!isClosed) { "Client is already closed" } - return flavors.values.map { ClientFlavor(it) } + return flavorById.values.map { ClientFlavor(it) } } override suspend fun findFlavor(id: UUID): Flavor? { check(!isClosed) { "Client is already closed" } - return flavors[id]?.let { ClientFlavor(it) } + return flavorById[id]?.let { ClientFlavor(it) } } override suspend fun newFlavor( @@ -156,7 +157,7 @@ internal class ComputeServiceImpl( meta ) - flavors[uid] = flavor + flavorById[uid] = flavor return ClientFlavor(flavor) } @@ -164,13 +165,13 @@ internal class ComputeServiceImpl( override suspend fun queryImages(): List { check(!isClosed) { "Client is already closed" } - return images.values.map { ClientImage(it) } + return imageById.values.map { ClientImage(it) } } override suspend fun findImage(id: UUID): Image? { check(!isClosed) { "Client is already closed" } - return images[id]?.let { ClientImage(it) } + return imageById[id]?.let { ClientImage(it) } } override suspend fun newImage(name: String, labels: Map, meta: Map): Image { @@ -179,7 +180,7 @@ internal class ComputeServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val image = InternalImage(this@ComputeServiceImpl, uid, name, labels, meta) - images[uid] = image + imageById[uid] = image return ClientImage(image) } @@ -199,13 +200,14 @@ internal class ComputeServiceImpl( this@ComputeServiceImpl, uid, name, - requireNotNull(flavors[flavor.uid]) { "Unknown flavor" }, - requireNotNull(images[image.uid]) { "Unknown image" }, + requireNotNull(flavorById[flavor.uid]) { "Unknown flavor" }, + requireNotNull(imageById[image.uid]) { "Unknown image" }, labels.toMutableMap(), meta.toMutableMap() ) - servers[uid] = server + serverById[uid] = server + servers.add(server) if (start) { server.start() @@ -217,13 +219,13 @@ internal class ComputeServiceImpl( override suspend fun findServer(id: UUID): Server? { check(!isClosed) { "Client is already closed" } - return servers[id]?.let { ClientServer(it) } + return serverById[id]?.let { ClientServer(it) } } override suspend fun queryServers(): List { check(!isClosed) { "Client is already closed" } - return servers.values.map { ClientServer(it) } + return serverById.values.map { ClientServer(it) } } override fun close() { @@ -263,7 +265,11 @@ internal class ComputeServiceImpl( } override fun lookupHost(server: Server): Host? { - val internal = requireNotNull(servers[server.uid]) { "Invalid server passed to lookupHost" } + if (server is InternalServer) { + return server.host + } + + val internal = requireNotNull(serverById[server.uid]) { "Invalid server passed to lookupHost" } return internal.host } @@ -296,15 +302,16 @@ internal class ComputeServiceImpl( } internal fun delete(flavor: InternalFlavor) { - flavors.remove(flavor.uid) + flavorById.remove(flavor.uid) } internal fun delete(image: InternalImage) { - images.remove(image.uid) + imageById.remove(image.uid) } internal fun delete(server: InternalServer) { - servers.remove(server.uid) + serverById.remove(server.uid) + servers.remove(server) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 4c07b785..f6744123 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -26,7 +26,6 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield -import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost @@ -59,7 +58,7 @@ public class ComputeServiceHelper( /** * The [ComputeService] that has been configured by the manager. */ - public val service: ComputeService + public val service: ComputeService = ComputeService(context, clock, scheduler, schedulingQuantum) /** * The [FlowEngine] to simulate the hosts. @@ -76,29 +75,23 @@ public class ComputeServiceHelper( */ private val random = SplittableRandom(seed) - init { - val service = createService(scheduler, schedulingQuantum) - this.service = service - } - /** * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. * * @param trace The trace to simulate. - * @param servers A list to which the created servers is added. * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). * @param failureModel A failure model to use for injecting failures. * @param interference A flag to indicate that VM interference needs to be enabled. */ public suspend fun run( trace: List, - servers: MutableList? = null, submitImmediately: Boolean = false, failureModel: FailureModel? = null, interference: Boolean = false, ) { val injector = failureModel?.createInjector(context, clock, service, Random(random.nextLong())) val client = service.newClient() + val clock = clock // Create new image for the virtual machine val image = client.newImage("vm-image") @@ -147,8 +140,6 @@ public class ComputeServiceHelper( meta = meta ) - servers?.add(server) - // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt index 45bd9ab1..a0ec4bd6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt @@ -42,7 +42,6 @@ import java.time.Instant * @param scope The [CoroutineScope] to run the reader in. * @param clock The virtual clock. * @param service The [ComputeService] to monitor. - * @param servers The [Server]s to monitor. * @param monitor The monitor to export the metrics to. * @param exportInterval The export interval. */ @@ -50,7 +49,6 @@ public class ComputeMetricReader( scope: CoroutineScope, clock: Clock, private val service: ComputeService, - private val servers: List, private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { @@ -76,6 +74,11 @@ public class ComputeMetricReader( */ private val job = scope.launch { val intervalMs = exportInterval.toMillis() + val service = service + val monitor = monitor + val hostTableReaders = hostTableReaders + val serverTableReaders = serverTableReaders + val serviceTableReader = serviceTableReader try { while (isActive) { @@ -91,7 +94,7 @@ public class ComputeMetricReader( reader.reset() } - for (server in servers) { + for (server in service.servers) { val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } reader.record(now) monitor.record(reader) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index 98702b4c..dbb5ced3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import org.opendc.compute.api.Server import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler @@ -78,7 +77,6 @@ public class CapelinRunner( val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) - val servers = mutableListOf() val partitions = scenario.partitions + ("seed" to seed.toString()) val partition = partitions.map { (k, v) -> "$k=$v" }.joinToString("/") val exporter = if (outputPath != null) { @@ -86,7 +84,6 @@ public class CapelinRunner( this, clock, runner.service, - servers, ParquetComputeMonitor( outputPath, partition, @@ -103,7 +100,7 @@ public class CapelinRunner( runner.apply(topology, optimize = true) // Run the workload trace - runner.run(vms, servers, failureModel = failureModel, interference = operationalPhenomena.hasInterference) + runner.run(vms, failureModel = failureModel, interference = operationalPhenomena.hasInterference) // Stop the metric collection exporter?.close() diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index bf8c2758..eae3c993 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,7 +26,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.api.Server import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -90,13 +89,11 @@ class CapelinIntegrationTest { seed, ) val topology = createTopology() - - val servers = mutableListOf() - val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, runner.service, monitor) try { runner.apply(topology) - runner.run(workload, servers) + runner.run(workload) val serviceMetrics = runner.service.getSchedulerStats() println( @@ -140,12 +137,11 @@ class CapelinIntegrationTest { seed, ) val topology = createTopology("single") - val servers = mutableListOf() - val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, runner.service, monitor) try { runner.apply(topology) - runner.run(workload, servers) + runner.run(workload) val serviceMetrics = runner.service.getSchedulerStats() println( @@ -186,12 +182,11 @@ class CapelinIntegrationTest { seed ) val topology = createTopology("single") - val servers = mutableListOf() - val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, simulator.service, monitor) try { simulator.apply(topology) - simulator.run(workload, servers, interference = true) + simulator.run(workload, interference = true) val serviceMetrics = simulator.service.getSchedulerStats() println( @@ -230,12 +225,11 @@ class CapelinIntegrationTest { ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val servers = mutableListOf() - val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, simulator.service, monitor) try { simulator.apply(topology) - simulator.run(workload, servers, failureModel = grid5000(Duration.ofDays(7))) + simulator.run(workload, failureModel = grid5000(Duration.ofDays(7))) val serviceMetrics = simulator.service.getSchedulerStats() println( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 570920f3..9a1319b6 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,7 +23,6 @@ package org.opendc.web.runner import mu.KotlinLogging -import org.opendc.compute.api.Server import org.opendc.compute.workload.* import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.topology.HostSpec @@ -222,14 +221,13 @@ public class OpenDCRunner( computeScheduler, seed = 0L, ) - val servers = mutableListOf() - val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) + val reader = ComputeMetricReader(this, clock, simulator.service, monitor) try { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, servers, failureModel = failureModel, interference = phenomena.interference) + simulator.run(vms, failureModel = failureModel, interference = phenomena.interference) val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { -- cgit v1.2.3