diff options
18 files changed, 495 insertions, 742 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 879f15b2..a7993291 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -31,7 +31,7 @@ import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.SimFairShareHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.time.Instant import java.util.* import kotlin.coroutines.resume @@ -71,7 +72,7 @@ internal class SimHostTest { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(engine, SplittableRandom(1), null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", @@ -155,7 +156,7 @@ internal class SimHostTest { val duration = 5 * 60L val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(engine, SplittableRandom(1), null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val host = SimHost( uid = UUID.randomUUID(), name = "test", diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index ad132efe..3be0217c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -32,7 +32,7 @@ import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.SimBareMetalMachine -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.flow.FlowEngine import java.time.Clock @@ -175,7 +175,7 @@ public class ComputeServiceHelper( */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { val machine = SimBareMetalMachine(engine, spec.model, spec.powerDriver) - val hypervisor = spec.hypervisor.create(engine, random, interferenceDomain = VmInterferenceDomain()) + val hypervisor = SimHypervisor(engine, spec.multiplexerFactory, random) val host = SimHost( spec.uid, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt index f3dc1e9e..87530f5a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/HostSpec.kt @@ -22,10 +22,9 @@ package org.opendc.compute.workload.topology -import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider -import org.opendc.simulator.compute.kernel.SimHypervisorProvider import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* /** @@ -36,7 +35,7 @@ import java.util.* * @param meta The metadata of the host. * @param model The physical model of the machine. * @param powerDriver The [PowerDriver] to model the power consumption of the machine. - * @param hypervisor The hypervisor implementation to use. + * @param multiplexerFactory The [FlowMultiplexerFactory] that is used to multiplex the virtual machines over the host. */ public data class HostSpec( val uid: UUID, @@ -44,5 +43,5 @@ public data class HostSpec( val meta: Map<String, Any>, val model: MachineModel, val powerDriver: PowerDriver, - val hypervisor: SimHypervisorProvider = SimFairShareHypervisorProvider() + val multiplexerFactory: FlowMultiplexerFactory = FlowMultiplexerFactory.maxMinMultiplexer() ) diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index e862e4d1..797d424e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -24,8 +24,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import org.opendc.simulator.compute.kernel.SimFairShareHypervisor -import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -36,6 +35,7 @@ import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import org.openjdk.jmh.annotations.* import java.util.SplittableRandom import java.util.concurrent.ThreadLocalRandom @@ -83,11 +83,8 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } @@ -106,11 +103,8 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } @@ -129,11 +123,8 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt deleted file mode 100644 index 77088b74..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember -import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.mux.FlowMultiplexer -import java.util.SplittableRandom -import kotlin.math.roundToLong - -/** - * Abstract implementation of the [SimHypervisor] interface. - * - * @param engine The [FlowEngine] to drive the simulation. - * @param random A randomness generator for the interference calculations. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. - * @param interferenceDomain The interference domain to which the hypervisor belongs. - */ -public abstract class SimAbstractHypervisor( - protected val engine: FlowEngine, - private val random: SplittableRandom, - private val scalingGovernor: ScalingGovernor?, - private val interferenceDomain: VmInterferenceDomain -) : SimHypervisor, FlowConvergenceListener { - /** - * The machine on which the hypervisor runs. - */ - protected lateinit var context: SimMachineContext - - /** - * The resource switch to use. - */ - protected abstract val mux: FlowMultiplexer - - /** - * The virtual machines running on this hypervisor. - */ - private val _vms = mutableSetOf<VirtualMachine>() - override val vms: Set<SimMachine> - get() = _vms - - /** - * The resource counters associated with the hypervisor. - */ - public override val counters: SimHypervisorCounters - get() = _counters - private val _counters = CountersImpl(this) - - /** - * The CPU capacity of the hypervisor in MHz. - */ - override val cpuCapacity: Double - get() = mux.capacity - - /** - * The CPU demand of the hypervisor in MHz. - */ - override val cpuDemand: Double - get() = mux.demand - - /** - * The CPU usage of the hypervisor in MHz. - */ - override val cpuUsage: Double - get() = mux.rate - - /** - * The scaling governors attached to the physical CPUs backing this hypervisor. - */ - private val governors = mutableListOf<ScalingGovernor.Logic>() - - /* SimHypervisor */ - override fun newMachine(model: MachineModel): SimVirtualMachine { - require(canFit(model)) { "Machine does not fit" } - val vm = VirtualMachine(model) - _vms.add(vm) - return vm - } - - override fun removeMachine(machine: SimVirtualMachine) { - if (_vms.remove(machine)) { - // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. - (machine as VirtualMachine).close() - } - } - - /* SimWorkload */ - override fun onStart(ctx: SimMachineContext) { - context = ctx - - _cpuCount = ctx.cpus.size - _cpuCapacity = ctx.cpus.sumOf { it.model.frequency } - _counters.d = _cpuCount / _cpuCapacity * 1000L - - // Clear the existing outputs of the multiplexer - mux.clearOutputs() - - for (cpu in ctx.cpus) { - val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) - if (governor != null) { - governors.add(governor) - governor.onStart() - } - - cpu.startConsumer(mux.newOutput()) - } - } - - override fun onStop(ctx: SimMachineContext) {} - - private var _cpuCount = 0 - private var _cpuCapacity = 0.0 - private var _lastConverge = engine.clock.millis() - - /* FlowConvergenceListener */ - override fun onConverge(now: Long) { - val lastConverge = _lastConverge - _lastConverge = now - val delta = now - lastConverge - - if (delta > 0) { - _counters.record() - - val mux = mux - val load = mux.rate / mux.capacity.coerceAtLeast(1.0) - val random = random - - for (vm in _vms) { - vm._counters.record(random, load) - } - } - - val load = cpuDemand / cpuCapacity - for (governor in governors) { - governor.onLimit(load) - } - } - - /** - * A virtual machine running on the hypervisor. - * - * @param model The machine model of the virtual machine. - */ - private inner class VirtualMachine(model: MachineModel) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { - /** - * A flag to indicate that the machine is closed. - */ - private var isClosed = false - - /** - * The vCPUs of the machine. - */ - override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) } - - /** - * The resource counters associated with the hypervisor. - */ - override val counters: SimHypervisorCounters - get() = _counters - @JvmField val _counters = VmCountersImpl(cpus, null) - - /** - * The CPU capacity of the hypervisor in MHz. - */ - override val cpuCapacity: Double - get() = cpus.sumOf(FlowConsumer::capacity) - - /** - * The CPU demand of the hypervisor in MHz. - */ - override val cpuDemand: Double - get() = cpus.sumOf(FlowConsumer::demand) - - /** - * The CPU usage of the hypervisor in MHz. - */ - override val cpuUsage: Double - get() = cpus.sumOf(FlowConsumer::rate) - - override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { - check(!isClosed) { "Machine is closed" } - - val profile = meta["interference-profile"] as? VmInterferenceProfile - val interferenceMember = if (profile != null) interferenceDomain.join(profile) else null - - val counters = _counters - counters.member = interferenceMember - - return super.startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - interferenceMember?.activate() - workload.onStart(ctx) - } catch (cause: Throwable) { - interferenceMember?.deactivate() - throw cause - } - } - - override fun onStop(ctx: SimMachineContext) { - interferenceMember?.deactivate() - counters.member = null - workload.onStop(ctx) - } - }, - meta - ) - } - - override fun close() { - if (isClosed) { - return - } - - isClosed = true - cancel() - - for (cpu in cpus) { - cpu.close() - } - } - } - - /** - * A [SimProcessingUnit] of a virtual machine. - */ - private class VCpu( - private val switch: FlowMultiplexer, - private val source: FlowConsumer, - override val model: ProcessingUnit - ) : SimProcessingUnit, FlowConsumer by source { - override var capacity: Double - get() = source.capacity - set(_) = TODO("Capacity changes on vCPU not supported") - - override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" - - /** - * Close the CPU - */ - fun close() { - switch.removeInput(source) - } - - fun flush() { - switch.flushCounters(source) - } - } - - /** - * A [ScalingPolicy] for a physical CPU of the hypervisor. - */ - private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy { - override var target: Double - get() = cpu.capacity - set(value) { - cpu.capacity = value - } - - override val max: Double = cpu.model.frequency - - override val min: Double = 0.0 - } - - /** - * Implementation of [SimHypervisorCounters]. - */ - private class CountersImpl(private val hv: SimAbstractHypervisor) : SimHypervisorCounters { - @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity - - override val cpuActiveTime: Long - get() = _cpuTime[0] - override val cpuIdleTime: Long - get() = _cpuTime[1] - override val cpuStealTime: Long - get() = _cpuTime[2] - override val cpuLostTime: Long - get() = _cpuTime[3] - - val _cpuTime = LongArray(4) - private val _previous = DoubleArray(3) - - /** - * Record the CPU time of the hypervisor. - */ - fun record() { - val cpuTime = _cpuTime - val previous = _previous - val counters = hv.mux.counters - - val demand = counters.demand - val actual = counters.actual - val remaining = counters.remaining - - val demandDelta = demand - previous[0] - val actualDelta = actual - previous[1] - val remainingDelta = remaining - previous[2] - - previous[0] = demand - previous[1] = actual - previous[2] = remaining - - cpuTime[0] += (actualDelta * d).roundToLong() - cpuTime[1] += (remainingDelta * d).roundToLong() - cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() - } - - override fun flush() { - hv.mux.flushCounters() - record() - } - } - - /** - * A [SimHypervisorCounters] implementation for a virtual machine. - */ - private inner class VmCountersImpl( - private val cpus: List<VCpu>, - @JvmField var member: VmInterferenceMember? - ) : SimHypervisorCounters { - private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 - - override val cpuActiveTime: Long - get() = _cpuTime[0] - override val cpuIdleTime: Long - get() = _cpuTime[1] - override val cpuStealTime: Long - get() = _cpuTime[2] - override val cpuLostTime: Long - get() = _cpuTime[3] - - private val _cpuTime = LongArray(4) - private val _previous = DoubleArray(3) - - /** - * Record the CPU time of the hypervisor. - */ - fun record(random: SplittableRandom, load: Double) { - val cpuTime = _cpuTime - val previous = _previous - - var demand = 0.0 - var actual = 0.0 - var remaining = 0.0 - - for (cpu in cpus) { - val counters = cpu.counters - - actual += counters.actual - demand += counters.demand - remaining += counters.remaining - } - - val demandDelta = demand - previous[0] - val actualDelta = actual - previous[1] - val remainingDelta = remaining - previous[2] - - previous[0] = demand - previous[1] = actual - previous[2] = remaining - - val d = d - cpuTime[0] += (actualDelta * d).roundToLong() - cpuTime[1] += (remainingDelta * d).roundToLong() - cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() - - // Compute the performance penalty due to flow interference - val member = member - if (member != null) { - val penalty = 1 - member.apply(random, load) - val interference = (actualDelta * d * penalty).roundToLong() - - if (interference > 0) { - cpuTime[3] += interference - _counters._cpuTime[3] += interference - } - } - } - - override fun flush() { - for (cpu in cpus) { - cpu.flush() - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt deleted file mode 100644 index fbee46da..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.mux.FlowMultiplexer -import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer -import java.util.SplittableRandom - -/** - * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] - * concurrently using weighted fair sharing. - * - * @param engine The [FlowEngine] to drive the simulation. - * @param random A randomness generator for the interference calculations. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. - * @param interferenceDomain The interference domain to which the hypervisor belongs. - */ -public class SimFairShareHypervisor( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() -) : SimAbstractHypervisor(engine, random, scalingGovernor, interferenceDomain) { - /** - * The multiplexer that distributes the computing capacity. - */ - override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this) - - override fun canFit(model: MachineModel): Boolean = true -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt deleted file mode 100644 index 81dfc43d..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowEngine -import java.util.* - -/** - * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. - */ -public class SimFairShareHypervisorProvider : SimHypervisorProvider { - override val id: String = "fair-share" - - override fun create( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain, - ): SimHypervisor = SimFairShareHypervisor(engine, random, scalingGovernor, interferenceDomain) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index d8e4e7cd..7594bf4d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -22,56 +22,415 @@ package org.opendc.simulator.compute.kernel -import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.* +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory +import java.util.SplittableRandom +import kotlin.math.roundToLong /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload * to another [SimMachine]. + * + * @param engine The [FlowEngine] to drive the simulation. + * @param muxFactory The factor for the [FlowMultiplexer] to multiplex the workloads. + * @param random A randomness generator for the interference calculations. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. + * @param interferenceDomain The interference domain to which the hypervisor belongs. */ -public interface SimHypervisor : SimWorkload { +public class SimHypervisor( + private val engine: FlowEngine, + muxFactory: FlowMultiplexerFactory, + private val random: SplittableRandom, + private val scalingGovernor: ScalingGovernor? = null, + private val interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() +) : SimWorkload, FlowConvergenceListener { + /** + * The [FlowMultiplexer] to multiplex the virtual machines. + */ + private val mux = muxFactory.newMultiplexer(engine, this) + /** - * The machines running on the hypervisor. + * The virtual machines running on this hypervisor. */ + private val _vms = mutableSetOf<VirtualMachine>() public val vms: Set<SimMachine> + get() = _vms /** * The resource counters associated with the hypervisor. */ public val counters: SimHypervisorCounters + get() = _counters + private val _counters = CountersImpl(this) /** - * The CPU usage of the hypervisor in MHz. + * The CPU capacity of the hypervisor in MHz. */ - public val cpuUsage: Double + public val cpuCapacity: Double + get() = mux.capacity /** - * The CPU usage of the hypervisor in MHz. + * The CPU demand of the hypervisor in MHz. */ public val cpuDemand: Double + get() = mux.demand /** - * The CPU capacity of the hypervisor in MHz. + * The CPU usage of the hypervisor in MHz. */ - public val cpuCapacity: Double + public val cpuUsage: Double + get() = mux.rate /** - * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. + * The machine on which the hypervisor runs. + */ + private lateinit var context: SimMachineContext + + /** + * The scaling governors attached to the physical CPUs backing this hypervisor. */ - public fun canFit(model: MachineModel): Boolean + private val governors = mutableListOf<ScalingGovernor.Logic>() + /* SimHypervisor */ /** * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. */ - public fun newMachine(model: MachineModel): SimVirtualMachine + public fun newMachine(model: MachineModel): SimVirtualMachine { + require(canFit(model)) { "Machine does not fit" } + val vm = VirtualMachine(model) + _vms.add(vm) + return vm + } /** * Remove the specified [machine] from the hypervisor. * * @param machine The machine to remove. */ - public fun removeMachine(machine: SimVirtualMachine) + public fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + + /** + * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. + */ + public fun canFit(model: MachineModel): Boolean { + return (mux.maxInputs - mux.inputs.size) >= model.cpus.size + } + + /* SimWorkload */ + override fun onStart(ctx: SimMachineContext) { + context = ctx + + _cpuCount = ctx.cpus.size + _cpuCapacity = ctx.cpus.sumOf { it.model.frequency } + _counters.d = _cpuCount / _cpuCapacity * 1000L + + // Clear the existing outputs of the multiplexer + mux.clearOutputs() + + for (cpu in ctx.cpus) { + val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) + if (governor != null) { + governors.add(governor) + governor.onStart() + } + + cpu.startConsumer(mux.newOutput()) + } + } + + override fun onStop(ctx: SimMachineContext) {} + + private var _cpuCount = 0 + private var _cpuCapacity = 0.0 + private var _lastConverge = engine.clock.millis() + + /* FlowConvergenceListener */ + override fun onConverge(now: Long) { + val lastConverge = _lastConverge + _lastConverge = now + val delta = now - lastConverge + + if (delta > 0) { + _counters.record() + + val mux = mux + val load = mux.rate / mux.capacity.coerceAtLeast(1.0) + val random = random + + for (vm in _vms) { + vm._counters.record(random, load) + } + } + + val load = cpuDemand / cpuCapacity + for (governor in governors) { + governor.onLimit(load) + } + } + + /** + * A virtual machine running on the hypervisor. + * + * @param model The machine model of the virtual machine. + */ + private inner class VirtualMachine(model: MachineModel) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + + /** + * The vCPUs of the machine. + */ + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) } + + /** + * The resource counters associated with the hypervisor. + */ + override val counters: SimHypervisorCounters + get() = _counters + @JvmField val _counters = VmCountersImpl(cpus, null) + + /** + * The CPU capacity of the hypervisor in MHz. + */ + override val cpuCapacity: Double + get() = cpus.sumOf(FlowConsumer::capacity) + + /** + * The CPU demand of the hypervisor in MHz. + */ + override val cpuDemand: Double + get() = cpus.sumOf(FlowConsumer::demand) + + /** + * The CPU usage of the hypervisor in MHz. + */ + override val cpuUsage: Double + get() = cpus.sumOf(FlowConsumer::rate) + + override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + val profile = meta["interference-profile"] as? VmInterferenceProfile + val interferenceMember = if (profile != null) interferenceDomain.join(profile) else null + + val counters = _counters + counters.member = interferenceMember + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + interferenceMember?.activate() + workload.onStart(ctx) + } catch (cause: Throwable) { + interferenceMember?.deactivate() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + interferenceMember?.deactivate() + counters.member = null + workload.onStop(ctx) + } + }, + meta + ) + } + + override fun close() { + if (isClosed) { + return + } + + isClosed = true + cancel() + + for (cpu in cpus) { + cpu.close() + } + } + } + + /** + * A [SimProcessingUnit] of a virtual machine. + */ + private class VCpu( + private val switch: FlowMultiplexer, + private val source: FlowConsumer, + override val model: ProcessingUnit + ) : SimProcessingUnit, FlowConsumer by source { + override var capacity: Double + get() = source.capacity + set(_) = TODO("Capacity changes on vCPU not supported") + + override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" + + /** + * Close the CPU + */ + fun close() { + switch.removeInput(source) + } + + fun flush() { + switch.flushCounters(source) + } + } + + /** + * A [ScalingPolicy] for a physical CPU of the hypervisor. + */ + private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy { + override var target: Double + get() = cpu.capacity + set(value) { + cpu.capacity = value + } + + override val max: Double = cpu.model.frequency + + override val min: Double = 0.0 + } + + /** + * Implementation of [SimHypervisorCounters]. + */ + private class CountersImpl(private val hv: SimHypervisor) : SimHypervisorCounters { + @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val cpuTime = _cpuTime + val previous = _previous + val counters = hv.mux.counters + + val demand = counters.demand + val actual = counters.actual + val remaining = counters.remaining + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + } + + override fun flush() { + hv.mux.flushCounters() + record() + } + } + + /** + * A [SimHypervisorCounters] implementation for a virtual machine. + */ + private inner class VmCountersImpl( + private val cpus: List<VCpu>, + @JvmField var member: VmInterferenceMember? + ) : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + private val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record(random: SplittableRandom, load: Double) { + val cpuTime = _cpuTime + val previous = _previous + + var demand = 0.0 + var actual = 0.0 + var remaining = 0.0 + + for (cpu in cpus) { + val counters = cpu.counters + + actual += counters.actual + demand += counters.demand + remaining += counters.remaining + } + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + val d = d + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + + // Compute the performance penalty due to flow interference + val member = member + if (member != null) { + val penalty = 1 - member.apply(random, load) + val interference = (actualDelta * d * penalty).roundToLong() + + if (interference > 0) { + cpuTime[3] += interference + _counters._cpuTime[3] += interference + } + } + } + + override fun flush() { + for (cpu in cpus) { + cpu.flush() + } + } + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt deleted file mode 100644 index 2c86854e..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowEngine -import java.util.SplittableRandom - -/** - * A service provider interface for constructing a [SimHypervisor]. - */ -public interface SimHypervisorProvider { - /** - * A unique identifier for this hypervisor implementation. - * - * Each hypervisor must provide a unique ID, so that they can be selected by the user. - * When in doubt, you may use the fully qualified name of your custom [SimHypervisor] implementation class. - */ - public val id: String - - /** - * Create a new [SimHypervisor] instance. - */ - public fun create( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor? = null, - interferenceDomain: VmInterferenceDomain - ): SimHypervisor -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt deleted file mode 100644 index c32dd027..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.mux.FlowMultiplexer -import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer -import java.util.SplittableRandom - -/** - * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. - * - * @param engine The [FlowEngine] to drive the simulation. - * @param random A randomness generator for the interference calculations. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. - * @param interferenceDomain The interference domain to which the hypervisor belongs. - */ -public class SimSpaceSharedHypervisor( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() -) : SimAbstractHypervisor(engine, random, scalingGovernor, interferenceDomain) { - override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this) - - override fun canFit(model: MachineModel): Boolean { - return mux.outputs.size - mux.inputs.size >= model.cpus.size - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt deleted file mode 100644 index cc303bbd..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowEngine -import java.util.* - -/** - * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. - */ -public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { - override val id: String = "space-shared" - - override fun create( - engine: FlowEngine, - random: SplittableRandom, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain, - ): SimHypervisor = SimSpaceSharedHypervisor(engine, random, scalingGovernor, interferenceDomain) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index d401f8b5..ddf8cf14 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* /** @@ -76,10 +77,9 @@ internal class SimFairShareHypervisorTest { ), ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, random, PerformanceScalingGovernor()) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), PerformanceScalingGovernor()) launch { machine.runWorkload(hypervisor) @@ -126,12 +126,9 @@ internal class SimFairShareHypervisorTest { ) ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, random, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) @@ -168,10 +165,9 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(platform, random, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) assertDoesNotThrow { launch { @@ -197,11 +193,8 @@ internal class SimFairShareHypervisorTest { .build() val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, model, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimFairShareHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val duration = 5 * 60L val workloadA = diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 9b31acf4..df6755f1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute.kernel -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* @@ -40,12 +39,12 @@ import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import java.util.* /** - * A test suite for the [SimSpaceSharedHypervisor]. + * A test suite for a space-shared [SimHypervisor]. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimSpaceSharedHypervisorTest { private lateinit var machineModel: MachineModel @@ -76,8 +75,7 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } val vm = hypervisor.newMachine(machineModel) @@ -99,8 +97,7 @@ internal class SimSpaceSharedHypervisorTest { val workload = SimRuntimeWorkload(duration) val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -121,11 +118,8 @@ internal class SimSpaceSharedHypervisorTest { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -143,11 +137,8 @@ internal class SimSpaceSharedHypervisorTest { fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -174,8 +165,7 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(engine, random, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -195,12 +185,9 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { - val interpreter = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val random = SplittableRandom(1) - val hypervisor = SimSpaceSharedHypervisor(interpreter, random, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 3b4583e2..8752c559 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -31,6 +31,16 @@ import org.opendc.simulator.flow.FlowSource */ public interface FlowMultiplexer { /** + * The maximum number of inputs supported by the multiplexer. + */ + public val maxInputs: Int + + /** + * The maximum number of outputs supported by the multiplexer. + */ + public val maxOutputs: Int + + /** * The inputs of the multiplexer that can be used to consume sources. */ public val inputs: Set<FlowConsumer> diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt new file mode 100644 index 00000000..a863e3ad --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.FlowConvergenceListener +import org.opendc.simulator.flow.FlowEngine + +/** + * Factory interface for a [FlowMultiplexer] implementation. + */ +public fun interface FlowMultiplexerFactory { + /** + * Construct a new [FlowMultiplexer] using the specified [engine] and [listener]. + */ + public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer + + public companion object { + /** + * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer]. + */ + private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) } + + /** + * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer]. + */ + private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) } + + /** + * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances. + */ + @JvmStatic + public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY + + /** + * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances. + */ + @JvmStatic + public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index c18b2701..c50e9bbc 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -37,6 +37,12 @@ public class ForwardingFlowMultiplexer( private val engine: FlowEngine, private val listener: FlowConvergenceListener? = null ) : FlowMultiplexer, FlowConvergenceListener { + + override val maxInputs: Int + get() = _outputs.size + + override val maxOutputs: Int = Int.MAX_VALUE + override val inputs: Set<FlowConsumer> get() = _inputs private val _inputs = mutableSetOf<Input>() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7605f67d..f2a4c1a4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -37,6 +37,11 @@ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, parent: FlowConvergenceListener? = null ) : FlowMultiplexer { + + override val maxInputs: Int = Int.MAX_VALUE + + override val maxOutputs: Int = Int.MAX_VALUE + /** * The inputs of the multiplexer. */ diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index f6fa2134..0fb8b67c 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -33,7 +33,6 @@ import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.topology.HostSpec -import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -41,6 +40,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import org.opendc.trace.Trace import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy @@ -131,7 +131,7 @@ internal class WorkflowServiceTest { emptyMap(), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)), - SimSpaceSharedHypervisorProvider() + FlowMultiplexerFactory.forwardingMultiplexer() ) } } |
