diff options
Diffstat (limited to 'opendc-simulator')
29 files changed, 869 insertions, 1094 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 91e91f9d..797d424e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -22,11 +22,9 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch -import org.opendc.simulator.compute.kernel.SimFairShareHypervisor -import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisor +import org.opendc.simulator.compute.kernel.SimHypervisor import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -37,7 +35,9 @@ import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory import org.openjdk.jmh.annotations.* +import java.util.SplittableRandom import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit @Fork(1) @Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var machineModel: MachineModel private lateinit var trace: SimTrace @@ -64,7 +63,7 @@ class SimMachineBenchmarks { repeat(10000) { val timestamp = it.toLong() val deadline = timestamp + 1000 - builder.add(timestamp, deadline, random.nextDouble(0.0, 4500.0), 1) + builder.add(deadline, random.nextDouble(0.0, 4500.0), 1) } trace = builder.build() } @@ -84,10 +83,8 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } @@ -106,10 +103,8 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimFairShareHypervisor(engine, null, null, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } @@ -128,10 +123,8 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble() { return runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimFairShareHypervisor(engine, null, null, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index e14ea507..ef0cd323 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -36,12 +36,10 @@ import org.opendc.simulator.flow.* * Abstract implementation of the [SimMachine] interface. * * @param engine The engine to manage the machine's resources. - * @param parent The parent simulation system. * @param model The model of the machine. */ public abstract class SimAbstractMachine( protected val engine: FlowEngine, - private val parent: FlowConvergenceListener?, final override val model: MachineModel ) : SimMachine, FlowConvergenceListener { /** @@ -86,9 +84,7 @@ public abstract class SimAbstractMachine( _ctx?.close() } - override fun onConverge(now: Long) { - parent?.onConverge(now) - } + override fun onConverge(now: Long) {} /** * The execution context in which the workload runs. @@ -105,7 +101,7 @@ public abstract class SimAbstractMachine( */ private var isClosed = false - override val engine: FlowEngine = this@SimAbstractMachine.engine + val engine: FlowEngine = this@SimAbstractMachine.engine /** * Start this context. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 63a82e78..0df897b6 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -40,15 +40,13 @@ import kotlin.math.max * @param model The machine model to simulate. * @param powerDriver The power driver to use. * @param psu The power supply of the machine. - * @param parent The parent simulation system. */ public class SimBareMetalMachine( engine: FlowEngine, model: MachineModel, powerDriver: PowerDriver, public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)), - parent: FlowConvergenceListener? = null, -) : SimAbstractMachine(engine, parent, model) { +) : SimAbstractMachine(engine, model) { /** * The current power usage of the machine (without PSU loss) in W. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index 1317f728..5e3a7766 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.compute -import org.opendc.simulator.flow.FlowEngine - /** * A simulated execution context in which a bootable image runs. This interface represents the * firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on @@ -31,11 +29,6 @@ import org.opendc.simulator.flow.FlowEngine */ public interface SimMachineContext : AutoCloseable { /** - * The [FlowEngine] that simulates the machine. - */ - public val engine: FlowEngine - - /** * The metadata associated with the context. */ public val meta: Map<String, Any> diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt deleted file mode 100644 index 8e925bdf..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceKey -import org.opendc.simulator.flow.mux.FlowMultiplexer -import kotlin.math.roundToLong - -/** - * Abstract implementation of the [SimHypervisor] interface. - * - * @param engine The [FlowEngine] to drive the simulation. - * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. - */ -public abstract class SimAbstractHypervisor( - protected val engine: FlowEngine, - private val listener: FlowConvergenceListener?, - private val scalingGovernor: ScalingGovernor?, - protected val interferenceDomain: VmInterferenceDomain? = null -) : SimHypervisor, FlowConvergenceListener { - /** - * The machine on which the hypervisor runs. - */ - protected lateinit var context: SimMachineContext - - /** - * The resource switch to use. - */ - protected abstract val mux: FlowMultiplexer - - /** - * The virtual machines running on this hypervisor. - */ - private val _vms = mutableSetOf<VirtualMachine>() - override val vms: Set<SimMachine> - get() = _vms - - /** - * The resource counters associated with the hypervisor. - */ - public override val counters: SimHypervisorCounters - get() = _counters - private val _counters = CountersImpl(this) - - /** - * The CPU capacity of the hypervisor in MHz. - */ - override val cpuCapacity: Double - get() = mux.capacity - - /** - * The CPU demand of the hypervisor in MHz. - */ - override val cpuDemand: Double - get() = mux.demand - - /** - * The CPU usage of the hypervisor in MHz. - */ - override val cpuUsage: Double - get() = mux.rate - - /** - * The scaling governors attached to the physical CPUs backing this hypervisor. - */ - private val governors = mutableListOf<ScalingGovernor.Logic>() - - /* SimHypervisor */ - override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { - require(canFit(model)) { "Machine does not fit" } - val vm = VirtualMachine(model, interferenceId) - _vms.add(vm) - return vm - } - - override fun removeMachine(machine: SimVirtualMachine) { - if (_vms.remove(machine)) { - // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. - (machine as VirtualMachine).close() - } - } - - /* SimWorkload */ - override fun onStart(ctx: SimMachineContext) { - context = ctx - - _cpuCount = ctx.cpus.size - _cpuCapacity = ctx.cpus.sumOf { it.model.frequency } - _counters.d = _cpuCount / _cpuCapacity * 1000L - - // Clear the existing outputs of the multiplexer - mux.clearOutputs() - - for (cpu in ctx.cpus) { - val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) - if (governor != null) { - governors.add(governor) - governor.onStart() - } - - cpu.startConsumer(mux.newOutput()) - } - } - - override fun onStop(ctx: SimMachineContext) {} - - private var _cpuCount = 0 - private var _cpuCapacity = 0.0 - private var _lastConverge = engine.clock.millis() - - /* FlowConvergenceListener */ - override fun onConverge(now: Long) { - val lastConverge = _lastConverge - _lastConverge = now - val delta = now - lastConverge - - if (delta > 0) { - _counters.record() - } - - val load = cpuDemand / cpuCapacity - for (governor in governors) { - governor.onLimit(load) - } - - listener?.onConverge(now) - } - - /** - * A virtual machine running on the hypervisor. - * - * @param model The machine model of the virtual machine. - */ - private inner class VirtualMachine( - model: MachineModel, - interferenceId: String? = null - ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable { - /** - * A flag to indicate that the machine is closed. - */ - private var isClosed = false - - /** - * The interference key of this virtual machine. - */ - private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } - - /** - * The vCPUs of the machine. - */ - override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) } - - /** - * The resource counters associated with the hypervisor. - */ - override val counters: SimHypervisorCounters - get() = _counters - private val _counters = VmCountersImpl(cpus) - - /** - * The CPU capacity of the hypervisor in MHz. - */ - override val cpuCapacity: Double - get() = cpus.sumOf(FlowConsumer::capacity) - - /** - * The CPU demand of the hypervisor in MHz. - */ - override val cpuDemand: Double - get() = cpus.sumOf(FlowConsumer::demand) - - /** - * The CPU usage of the hypervisor in MHz. - */ - override val cpuUsage: Double - get() = cpus.sumOf(FlowConsumer::rate) - - override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { - check(!isClosed) { "Machine is closed" } - - return super.startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - joinInterferenceDomain() - workload.onStart(ctx) - } catch (cause: Throwable) { - leaveInterferenceDomain() - throw cause - } - } - - override fun onStop(ctx: SimMachineContext) { - leaveInterferenceDomain() - workload.onStop(ctx) - } - }, - meta - ) - } - - override fun close() { - if (isClosed) { - return - } - - isClosed = true - cancel() - - for (cpu in cpus) { - cpu.close() - } - } - - /** - * Join the interference domain of the hypervisor. - */ - private fun joinInterferenceDomain() { - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.join(interferenceKey) - } - } - - /** - * Leave the interference domain of the hypervisor. - */ - private fun leaveInterferenceDomain() { - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) - } - } - } - - /** - * A [SimProcessingUnit] of a virtual machine. - */ - private class VCpu( - private val switch: FlowMultiplexer, - private val source: FlowConsumer, - override val model: ProcessingUnit - ) : SimProcessingUnit, FlowConsumer by source { - override var capacity: Double - get() = source.capacity - set(_) = TODO("Capacity changes on vCPU not supported") - - override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" - - /** - * Close the CPU - */ - fun close() { - switch.removeInput(source) - } - - fun flush() { - switch.flushCounters(source) - } - } - - /** - * A [ScalingPolicy] for a physical CPU of the hypervisor. - */ - private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy { - override var target: Double - get() = cpu.capacity - set(value) { - cpu.capacity = value - } - - override val max: Double = cpu.model.frequency - - override val min: Double = 0.0 - } - - /** - * Implementation of [SimHypervisorCounters]. - */ - private class CountersImpl(private val hv: SimAbstractHypervisor) : SimHypervisorCounters { - @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity - - override val cpuActiveTime: Long - get() = _cpuTime[0] - override val cpuIdleTime: Long - get() = _cpuTime[1] - override val cpuStealTime: Long - get() = _cpuTime[2] - override val cpuLostTime: Long - get() = _cpuTime[3] - - private val _cpuTime = LongArray(4) - private val _previous = DoubleArray(4) - - /** - * Record the CPU time of the hypervisor. - */ - fun record() { - val cpuTime = _cpuTime - val previous = _previous - val counters = hv.mux.counters - - val demand = counters.demand - val actual = counters.actual - val remaining = counters.remaining - val interference = counters.interference - - val demandDelta = demand - previous[0] - val actualDelta = actual - previous[1] - val remainingDelta = remaining - previous[2] - val interferenceDelta = interference - previous[3] - - previous[0] = demand - previous[1] = actual - previous[2] = remaining - previous[3] = interference - - cpuTime[0] += (actualDelta * d).roundToLong() - cpuTime[1] += (remainingDelta * d).roundToLong() - cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() - cpuTime[3] += (interferenceDelta * d).roundToLong() - } - - override fun flush() { - hv.mux.flushCounters() - record() - } - } - - /** - * A [SimHypervisorCounters] implementation for a virtual machine. - */ - private class VmCountersImpl(private val cpus: List<VCpu>) : SimHypervisorCounters { - private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 - - override val cpuActiveTime: Long - get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() - override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong() - override val cpuStealTime: Long - get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() - override val cpuLostTime: Long - get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() - - override fun flush() { - for (cpu in cpus) { - cpu.flush() - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt deleted file mode 100644 index 36f76650..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.SimMachine -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.mux.FlowMultiplexer -import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer - -/** - * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] - * concurrently using weighted fair sharing. - * - * @param engine The [FlowEngine] to manage the machine's resources. - * @param listener The listener for the convergence of the system. - * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. - * @param interferenceDomain The resource interference domain to which the hypervisor belongs. - */ -public class SimFairShareHypervisor( - engine: FlowEngine, - listener: FlowConvergenceListener?, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain?, -) : SimAbstractHypervisor(engine, listener, scalingGovernor, interferenceDomain) { - /** - * The multiplexer that distributes the computing capacity. - */ - override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this, interferenceDomain) - - override fun canFit(model: MachineModel): Boolean = true -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt deleted file mode 100644 index 3136f4c8..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowEngine - -/** - * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. - */ -public class SimFairShareHypervisorProvider : SimHypervisorProvider { - override val id: String = "fair-share" - - override fun create( - engine: FlowEngine, - listener: FlowConvergenceListener?, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain?, - ): SimHypervisor = SimFairShareHypervisor(engine, listener, scalingGovernor, interferenceDomain) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index a69f419f..7594bf4d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -22,57 +22,415 @@ package org.opendc.simulator.compute.kernel -import org.opendc.simulator.compute.SimMachine +import org.opendc.simulator.compute.* +import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor +import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy +import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember +import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory +import java.util.SplittableRandom +import kotlin.math.roundToLong /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload * to another [SimMachine]. + * + * @param engine The [FlowEngine] to drive the simulation. + * @param muxFactory The factor for the [FlowMultiplexer] to multiplex the workloads. + * @param random A randomness generator for the interference calculations. + * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. + * @param interferenceDomain The interference domain to which the hypervisor belongs. */ -public interface SimHypervisor : SimWorkload { +public class SimHypervisor( + private val engine: FlowEngine, + muxFactory: FlowMultiplexerFactory, + private val random: SplittableRandom, + private val scalingGovernor: ScalingGovernor? = null, + private val interferenceDomain: VmInterferenceDomain = VmInterferenceDomain() +) : SimWorkload, FlowConvergenceListener { + /** + * The [FlowMultiplexer] to multiplex the virtual machines. + */ + private val mux = muxFactory.newMultiplexer(engine, this) + /** - * The machines running on the hypervisor. + * The virtual machines running on this hypervisor. */ + private val _vms = mutableSetOf<VirtualMachine>() public val vms: Set<SimMachine> + get() = _vms /** * The resource counters associated with the hypervisor. */ public val counters: SimHypervisorCounters + get() = _counters + private val _counters = CountersImpl(this) /** - * The CPU usage of the hypervisor in MHz. + * The CPU capacity of the hypervisor in MHz. */ - public val cpuUsage: Double + public val cpuCapacity: Double + get() = mux.capacity /** - * The CPU usage of the hypervisor in MHz. + * The CPU demand of the hypervisor in MHz. */ public val cpuDemand: Double + get() = mux.demand /** - * The CPU capacity of the hypervisor in MHz. + * The CPU usage of the hypervisor in MHz. */ - public val cpuCapacity: Double + public val cpuUsage: Double + get() = mux.rate /** - * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. + * The machine on which the hypervisor runs. + */ + private lateinit var context: SimMachineContext + + /** + * The scaling governors attached to the physical CPUs backing this hypervisor. */ - public fun canFit(model: MachineModel): Boolean + private val governors = mutableListOf<ScalingGovernor.Logic>() + /* SimHypervisor */ /** * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. - * @param interferenceId An identifier for the interference model. */ - public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + public fun newMachine(model: MachineModel): SimVirtualMachine { + require(canFit(model)) { "Machine does not fit" } + val vm = VirtualMachine(model) + _vms.add(vm) + return vm + } /** * Remove the specified [machine] from the hypervisor. * * @param machine The machine to remove. */ - public fun removeMachine(machine: SimVirtualMachine) + public fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + + /** + * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. + */ + public fun canFit(model: MachineModel): Boolean { + return (mux.maxInputs - mux.inputs.size) >= model.cpus.size + } + + /* SimWorkload */ + override fun onStart(ctx: SimMachineContext) { + context = ctx + + _cpuCount = ctx.cpus.size + _cpuCapacity = ctx.cpus.sumOf { it.model.frequency } + _counters.d = _cpuCount / _cpuCapacity * 1000L + + // Clear the existing outputs of the multiplexer + mux.clearOutputs() + + for (cpu in ctx.cpus) { + val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) + if (governor != null) { + governors.add(governor) + governor.onStart() + } + + cpu.startConsumer(mux.newOutput()) + } + } + + override fun onStop(ctx: SimMachineContext) {} + + private var _cpuCount = 0 + private var _cpuCapacity = 0.0 + private var _lastConverge = engine.clock.millis() + + /* FlowConvergenceListener */ + override fun onConverge(now: Long) { + val lastConverge = _lastConverge + _lastConverge = now + val delta = now - lastConverge + + if (delta > 0) { + _counters.record() + + val mux = mux + val load = mux.rate / mux.capacity.coerceAtLeast(1.0) + val random = random + + for (vm in _vms) { + vm._counters.record(random, load) + } + } + + val load = cpuDemand / cpuCapacity + for (governor in governors) { + governor.onLimit(load) + } + } + + /** + * A virtual machine running on the hypervisor. + * + * @param model The machine model of the virtual machine. + */ + private inner class VirtualMachine(model: MachineModel) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + + /** + * The vCPUs of the machine. + */ + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) } + + /** + * The resource counters associated with the hypervisor. + */ + override val counters: SimHypervisorCounters + get() = _counters + @JvmField val _counters = VmCountersImpl(cpus, null) + + /** + * The CPU capacity of the hypervisor in MHz. + */ + override val cpuCapacity: Double + get() = cpus.sumOf(FlowConsumer::capacity) + + /** + * The CPU demand of the hypervisor in MHz. + */ + override val cpuDemand: Double + get() = cpus.sumOf(FlowConsumer::demand) + + /** + * The CPU usage of the hypervisor in MHz. + */ + override val cpuUsage: Double + get() = cpus.sumOf(FlowConsumer::rate) + + override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + val profile = meta["interference-profile"] as? VmInterferenceProfile + val interferenceMember = if (profile != null) interferenceDomain.join(profile) else null + + val counters = _counters + counters.member = interferenceMember + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + interferenceMember?.activate() + workload.onStart(ctx) + } catch (cause: Throwable) { + interferenceMember?.deactivate() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + interferenceMember?.deactivate() + counters.member = null + workload.onStop(ctx) + } + }, + meta + ) + } + + override fun close() { + if (isClosed) { + return + } + + isClosed = true + cancel() + + for (cpu in cpus) { + cpu.close() + } + } + } + + /** + * A [SimProcessingUnit] of a virtual machine. + */ + private class VCpu( + private val switch: FlowMultiplexer, + private val source: FlowConsumer, + override val model: ProcessingUnit + ) : SimProcessingUnit, FlowConsumer by source { + override var capacity: Double + get() = source.capacity + set(_) = TODO("Capacity changes on vCPU not supported") + + override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]" + + /** + * Close the CPU + */ + fun close() { + switch.removeInput(source) + } + + fun flush() { + switch.flushCounters(source) + } + } + + /** + * A [ScalingPolicy] for a physical CPU of the hypervisor. + */ + private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy { + override var target: Double + get() = cpu.capacity + set(value) { + cpu.capacity = value + } + + override val max: Double = cpu.model.frequency + + override val min: Double = 0.0 + } + + /** + * Implementation of [SimHypervisorCounters]. + */ + private class CountersImpl(private val hv: SimHypervisor) : SimHypervisorCounters { + @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val cpuTime = _cpuTime + val previous = _previous + val counters = hv.mux.counters + + val demand = counters.demand + val actual = counters.actual + val remaining = counters.remaining + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + } + + override fun flush() { + hv.mux.flushCounters() + record() + } + } + + /** + * A [SimHypervisorCounters] implementation for a virtual machine. + */ + private inner class VmCountersImpl( + private val cpus: List<VCpu>, + @JvmField var member: VmInterferenceMember? + ) : SimHypervisorCounters { + private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + private val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record(random: SplittableRandom, load: Double) { + val cpuTime = _cpuTime + val previous = _previous + + var demand = 0.0 + var actual = 0.0 + var remaining = 0.0 + + for (cpu in cpus) { + val counters = cpu.counters + + actual += counters.actual + demand += counters.demand + remaining += counters.remaining + } + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + val d = d + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + + // Compute the performance penalty due to flow interference + val member = member + if (member != null) { + val penalty = 1 - member.apply(random, load) + val interference = (actualDelta * d * penalty).roundToLong() + + if (interference > 0) { + cpuTime[3] += interference + _counters._cpuTime[3] += interference + } + } + } + + override fun flush() { + for (cpu in cpus) { + cpu.flush() + } + } + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt deleted file mode 100644 index 483217af..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowEngine - -/** - * A service provider interface for constructing a [SimHypervisor]. - */ -public interface SimHypervisorProvider { - /** - * A unique identifier for this hypervisor implementation. - * - * Each hypervisor must provide a unique ID, so that they can be selected by the user. - * When in doubt, you may use the fully qualified name of your custom [SimHypervisor] implementation class. - */ - public val id: String - - /** - * Create a new [SimHypervisor] instance. - */ - public fun create( - engine: FlowEngine, - listener: FlowConvergenceListener? = null, - scalingGovernor: ScalingGovernor? = null, - interferenceDomain: VmInterferenceDomain? = null, - ): SimHypervisor -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt deleted file mode 100644 index 3f3bf6ad..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.mux.FlowMultiplexer -import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer - -/** - * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. - */ -public class SimSpaceSharedHypervisor( - engine: FlowEngine, - listener: FlowConvergenceListener?, - scalingGovernor: ScalingGovernor?, -) : SimAbstractHypervisor(engine, listener, scalingGovernor) { - override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this) - - override fun canFit(model: MachineModel): Boolean { - return mux.outputs.size - mux.inputs.size >= model.cpus.size - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt deleted file mode 100644 index dd6fb0b1..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.kernel - -import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor -import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowEngine - -/** - * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. - */ -public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { - override val id: String = "space-shared" - - override fun create( - engine: FlowEngine, - listener: FlowConvergenceListener?, - scalingGovernor: ScalingGovernor?, - interferenceDomain: VmInterferenceDomain?, - ): SimHypervisor = SimSpaceSharedHypervisor(engine, listener, scalingGovernor) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index 09b03306..6861823b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,38 +22,110 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.flow.interference.InterferenceDomain -import org.opendc.simulator.flow.interference.InterferenceKey +import java.util.ArrayDeque +import java.util.ArrayList +import java.util.WeakHashMap /** - * The interference domain of a hypervisor. + * A domain where virtual machines may incur performance variability due to operating on the same resource and + * therefore causing interference. */ -public interface VmInterferenceDomain : InterferenceDomain { +public class VmInterferenceDomain { /** - * Construct an [InterferenceKey] for the specified [id]. - * - * @param id The identifier of the virtual machine. - * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine - * does not participate in the domain. + * A cache to maintain a mapping between the active profiles in this domain. */ - public fun createKey(id: String): InterferenceKey? + private val cache = WeakHashMap<VmInterferenceProfile, VmInterferenceMember>() /** - * Remove the specified [key] from this domain. + * The set of members active in this domain. */ - public fun removeKey(key: InterferenceKey) + private val activeKeys = ArrayList<VmInterferenceMember>() /** - * Mark the specified [key] as active in this interference domain. - * - * @param key The key to join the interference domain with. + * Queue of participants that will be removed or added to the active groups. */ - public fun join(key: InterferenceKey) + private val participants = ArrayDeque<VmInterferenceMember>() /** - * Mark the specified [key] as inactive in this interference domain. - * - * @param key The key of the virtual machine that wants to leave the domain. + * Join this interference domain with the specified [profile] and return the [VmInterferenceMember] associated with + * the profile. If the member does not exist, it will be created. */ - public fun leave(key: InterferenceKey) + public fun join(profile: VmInterferenceProfile): VmInterferenceMember { + return cache.computeIfAbsent(profile) { key -> key.newMember(this) } + } + + /** + * Mark the specified [member] as active in this interference domain. + */ + internal fun activate(member: VmInterferenceMember) { + val activeKeys = activeKeys + val pos = activeKeys.binarySearch(member) + if (pos < 0) { + activeKeys.add(-pos - 1, member) + } + + computeActiveGroups(activeKeys, member) + } + + /** + * Mark the specified [member] as inactive in this interference domain. + */ + internal fun deactivate(member: VmInterferenceMember) { + val activeKeys = activeKeys + activeKeys.remove(member) + computeActiveGroups(activeKeys, member) + } + + /** + * (Re-)compute the active groups. + */ + private fun computeActiveGroups(activeKeys: ArrayList<VmInterferenceMember>, member: VmInterferenceMember) { + if (activeKeys.isEmpty()) { + return + } + + val groups = member.membership + val members = member.members + val participants = participants + + for (group in groups) { + val groupMembers = members[group] + + var i = 0 + var j = 0 + var intersection = 0 + + // Compute the intersection of the group members and the current active members + while (i < groupMembers.size && j < activeKeys.size) { + val l = groupMembers[i] + val rightEntry = activeKeys[j] + val r = rightEntry.id + + if (l < r) { + i++ + } else if (l > r) { + j++ + } else { + if (++intersection > 1) { + rightEntry.addGroup(group) + } else { + participants.add(rightEntry) + } + + i++ + j++ + } + } + + while (true) { + val participant = participants.poll() ?: break + + if (intersection <= 1) { + participant.removeGroup(group) + } else { + participant.addGroup(group) + } + } + } + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt new file mode 100644 index 00000000..762bb568 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +import java.util.* + +/** + * A participant of an interference domain. + */ +public class VmInterferenceMember( + private val domain: VmInterferenceDomain, + private val model: VmInterferenceModel, + @JvmField internal val id: Int, + @JvmField internal val membership: IntArray, + @JvmField internal val members: Array<IntArray>, + private val targets: DoubleArray, + private val scores: DoubleArray +) : Comparable<VmInterferenceMember> { + /** + * The active groups to which the key belongs. + */ + private var groups: IntArray = IntArray(2) + private var groupsSize: Int = 0 + + /** + * The number of users of the interference key. + */ + private var refCount: Int = 0 + + /** + * Mark this member as active in this interference domain. + */ + public fun activate() { + if (refCount++ <= 0) { + domain.activate(this) + } + } + + /** + * Mark this member as inactive in this interference domain. + */ + public fun deactivate() { + if (--refCount <= 0) { + domain.deactivate(this) + } + } + + /** + * Compute the performance score of the member in this interference domain. + * + * @param random The source of randomness to apply when computing the performance score. + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the member, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(random: SplittableRandom, load: Double): Double { + val groupsSize = groupsSize + + if (groupsSize == 0) { + return 1.0 + } + + val groups = groups + val targets = targets + + var low = 0 + var high = groupsSize - 1 + var group = -1 + + // Perform binary search over the groups based on target load + while (low <= high) { + val mid = low + high ushr 1 + val midGroup = groups[mid] + val target = targets[midGroup] + + if (target < load) { + low = mid + 1 + group = midGroup + } else if (target > load) { + high = mid - 1 + } else { + group = midGroup + break + } + } + + return if (group >= 0 && random.nextInt(members[group].size) == 0) { + scores[group] + } else { + 1.0 + } + } + + /** + * Add an active group to this member. + */ + internal fun addGroup(group: Int) { + var groups = groups + val groupsSize = groupsSize + val pos = groups.binarySearch(group, toIndex = groupsSize) + + if (pos >= 0) { + return + } + + val idx = -pos - 1 + + if (groups.size == groupsSize) { + val newSize = groupsSize + (groupsSize shr 1) + groups = groups.copyOf(newSize) + this.groups = groups + } + + groups.copyInto(groups, idx + 1, idx, groupsSize) + groups[idx] = group + this.groupsSize += 1 + } + + /** + * Remove an active group from this member. + */ + internal fun removeGroup(group: Int) { + val groups = groups + val groupsSize = groupsSize + val pos = groups.binarySearch(group, toIndex = groupsSize) + + if (pos < 0) { + return + } + + groups.copyInto(groups, pos, pos + 1, groupsSize) + this.groupsSize -= 1 + } + + override fun compareTo(other: VmInterferenceMember): Int { + val cmp = model.hashCode().compareTo(other.model.hashCode()) + if (cmp != 0) { + return cmp + } + + return id.compareTo(other.id) + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index 977292be..018c6e3d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -22,43 +22,35 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.flow.interference.InterferenceKey import java.util.* /** * An interference model that models the resource interference between virtual machines on a host. * - * @param targets The target load of each group. + * @param members The target load of each group. * @param scores The performance score of each group. * @param members The members belonging to each group. * @param membership The identifier of each key. * @param size The number of groups. - * @param seed The seed to use for randomly selecting the virtual machines that are affected. */ public class VmInterferenceModel private constructor( - private val targets: DoubleArray, - private val scores: DoubleArray, private val idMapping: Map<String, Int>, private val members: Array<IntArray>, private val membership: Array<IntArray>, - private val size: Int, - seed: Long, + private val targets: DoubleArray, + private val scores: DoubleArray, + private val size: Int ) { /** - * A [SplittableRandom] used for selecting the virtual machines that are affected. - */ - private val random = SplittableRandom(seed) - - /** - * Construct a new [VmInterferenceDomain]. - */ - public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random) - - /** - * Create a copy of this model with a different seed. + * Return the [VmInterferenceProfile] associated with the specified [id]. + * + * @param id The identifier of the virtual machine. + * @return A [VmInterferenceProfile] representing the virtual machine as part of interference model or `null` if + * there is no profile for the virtual machine. */ - public fun withSeed(seed: Long): VmInterferenceModel { - return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed) + public fun getProfile(id: String): VmInterferenceProfile? { + val intId = idMapping[id] ?: return null + return VmInterferenceProfile(this, intId, membership[intId], members, targets, scores) } public companion object { @@ -74,11 +66,6 @@ public class VmInterferenceModel private constructor( */ public class Builder internal constructor() { /** - * The initial capacity of the builder. - */ - private val INITIAL_CAPACITY = 256 - - /** * The target load of each group. */ private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY } @@ -126,14 +113,14 @@ public class VmInterferenceModel private constructor( /** * Build the [VmInterferenceModel]. */ - public fun build(seed: Long = 0): VmInterferenceModel { + public fun build(): VmInterferenceModel { val size = size val targets = _targets val scores = _scores val members = _members - val indices = Array(size) { it } - indices.sortWith( + val indices = IntArray(size) { it } + indices.sortedWith( Comparator { l, r -> var cmp = targets[l].compareTo(targets[r]) // Order by target load if (cmp != 0) { @@ -172,13 +159,12 @@ public class VmInterferenceModel private constructor( @Suppress("UNCHECKED_CAST") return VmInterferenceModel( - newTargets, - newScores, idMapping, newMembers as Array<IntArray>, membership.map { it.value.toIntArray() }.toTypedArray(), - size, - seed + newTargets, + newScores, + size ) } @@ -192,202 +178,12 @@ public class VmInterferenceModel private constructor( _targets = _targets.copyOf(newSize) _scores = _scores.copyOf(newSize) } - } - - /** - * Internal implementation of [VmInterferenceDomain]. - */ - private class InterferenceDomainImpl( - private val targets: DoubleArray, - private val scores: DoubleArray, - private val idMapping: Map<String, Int>, - private val members: Array<IntArray>, - private val membership: Array<IntArray>, - private val random: SplittableRandom - ) : VmInterferenceDomain { - /** - * Keys registered with this domain. - */ - private val keys = HashMap<Int, InterferenceKeyImpl>() - - /** - * The set of keys active in this domain. - */ - private val activeKeys = ArrayList<InterferenceKeyImpl>() - - override fun createKey(id: String): InterferenceKey? { - val intId = idMapping[id] ?: return null - return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } - } - - override fun removeKey(key: InterferenceKey) { - if (key !is InterferenceKeyImpl) { - return - } - - if (activeKeys.remove(key)) { - computeActiveGroups(key.id) - } - - keys.remove(key.id) - } - - override fun join(key: InterferenceKey) { - if (key !is InterferenceKeyImpl) { - return - } - - if (key.acquire()) { - val pos = activeKeys.binarySearch(key) - if (pos < 0) { - activeKeys.add(-pos - 1, key) - } - computeActiveGroups(key.id) - } - } - override fun leave(key: InterferenceKey) { - if (key is InterferenceKeyImpl && key.release()) { - activeKeys.remove(key) - computeActiveGroups(key.id) - } + private companion object { + /** + * The initial capacity of the builder. + */ + const val INITIAL_CAPACITY = 256 } - - override fun apply(key: InterferenceKey?, load: Double): Double { - if (key == null || key !is InterferenceKeyImpl) { - return 1.0 - } - - val groups = key.groups - val groupSize = groups.size - - if (groupSize == 0) { - return 1.0 - } - - val targets = targets - val scores = scores - var low = 0 - var high = groups.size - 1 - - var group = -1 - var score = 1.0 - - // Perform binary search over the groups based on target load - while (low <= high) { - val mid = low + high ushr 1 - val midGroup = groups[mid] - val target = targets[midGroup] - - if (target < load) { - low = mid + 1 - group = midGroup - score = scores[midGroup] - } else if (target > load) { - high = mid - 1 - } else { - group = midGroup - score = scores[midGroup] - break - } - } - - return if (group >= 0 && random.nextInt(members[group].size) == 0) { - score - } else { - 1.0 - } - } - - override fun toString(): String = "VmInterferenceDomain" - - /** - * Queue of participants that will be removed or added to the active groups. - */ - private val _participants = ArrayDeque<InterferenceKeyImpl>() - - /** - * (Re-)Compute the active groups. - */ - private fun computeActiveGroups(id: Int) { - val activeKeys = activeKeys - val groups = membership[id] - - if (activeKeys.isEmpty()) { - return - } - - val members = members - val participants = _participants - - for (group in groups) { - val groupMembers = members[group] - - var i = 0 - var j = 0 - var intersection = 0 - - // Compute the intersection of the group members and the current active members - while (i < groupMembers.size && j < activeKeys.size) { - val l = groupMembers[i] - val rightEntry = activeKeys[j] - val r = rightEntry.id - - if (l < r) { - i++ - } else if (l > r) { - j++ - } else { - participants.add(rightEntry) - intersection++ - - i++ - j++ - } - } - - while (true) { - val participant = participants.poll() ?: break - val participantGroups = participant.groups - if (intersection <= 1) { - participantGroups.remove(group) - } else { - val pos = participantGroups.binarySearch(group) - if (pos < 0) { - participantGroups.add(-pos - 1, group) - } - } - } - } - } - } - - /** - * An interference key. - * - * @param id The identifier of the member. - */ - private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> { - /** - * The active groups to which the key belongs. - */ - @JvmField val groups: MutableList<Int> = ArrayList() - - /** - * The number of users of the interference key. - */ - private var refCount: Int = 0 - - /** - * Join the domain. - */ - fun acquire(): Boolean = refCount++ <= 0 - - /** - * Leave the domain. - */ - fun release(): Boolean = --refCount <= 0 - - override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt new file mode 100644 index 00000000..004dbd07 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.kernel.interference + +/** + * A profile of a particular virtual machine describing its interference pattern with other virtual machines. + * + * @param model The model to which this profile belongs. + * @property id The identifier of the profile inside the model. + * @property membership The membership of the profile in the groups. + * @param members The members in the model. + * @param targets The targets in the model. + * @param scores The scores in the model. + */ +public class VmInterferenceProfile internal constructor( + private val model: VmInterferenceModel, + private val id: Int, + private val membership: IntArray, + private val members: Array<IntArray>, + private val targets: DoubleArray, + private val scores: DoubleArray +) { + /** + * Create a new [VmInterferenceMember] based on this profile for the specified [domain]. + */ + internal fun newMember(domain: VmInterferenceDomain): VmInterferenceMember { + return VmInterferenceMember(domain, model, id, membership, members, targets, scores) + } + + override fun toString(): String = "VmInterferenceProfile[id=$id]" +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt index 7e4d7191..22dcaef4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt @@ -35,4 +35,20 @@ public data class MachineModel( public val memory: List<MemoryUnit>, public val net: List<NetworkAdapter> = emptyList(), public val storage: List<StorageDevice> = emptyList() -) +) { + /** + * Optimize the [MachineModel] by merging all resources of the same type into a single resource with the combined + * capacity. Such configurations can be simulated more efficiently by OpenDC. + */ + public fun optimize(): MachineModel { + val originalCpu = cpus[0] + val freq = cpus.sumOf { it.frequency } + val processingNode = originalCpu.node.copy(coreCount = 1) + val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode)) + + val memorySize = memory.sumOf { it.size } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return MachineModel(processingUnits, memoryUnits) + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt index 6b820e5d..e66227b5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -31,14 +31,12 @@ import kotlin.math.min * A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s. * * @param usageCol The column containing the CPU usage of each fragment (in MHz). - * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis). * @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis). * @param coresCol The column containing the utilized cores. * @param size The number of fragments in the trace. */ public class SimTrace( private val usageCol: DoubleArray, - private val timestampCol: LongArray, private val deadlineCol: LongArray, private val coresCol: IntArray, private val size: Int, @@ -46,7 +44,6 @@ public class SimTrace( init { require(size >= 0) { "Invalid trace size" } require(usageCol.size >= size) { "Invalid number of usage entries" } - require(timestampCol.size >= size) { "Invalid number of timestamp entries" } require(deadlineCol.size >= size) { "Invalid number of deadline entries" } require(coresCol.size >= size) { "Invalid number of core entries" } } @@ -59,19 +56,17 @@ public class SimTrace( public fun ofFragments(fragments: List<SimTraceFragment>): SimTrace { val size = fragments.size val usageCol = DoubleArray(size) - val timestampCol = LongArray(size) val deadlineCol = LongArray(size) val coresCol = IntArray(size) for (i in fragments.indices) { val fragment = fragments[i] usageCol[i] = fragment.usage - timestampCol[i] = fragment.timestamp deadlineCol[i] = fragment.timestamp + fragment.duration coresCol[i] = fragment.cores } - return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + return SimTrace(usageCol, deadlineCol, coresCol, size) } /** @@ -81,19 +76,17 @@ public class SimTrace( public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace { val size = fragments.size val usageCol = DoubleArray(size) - val timestampCol = LongArray(size) val deadlineCol = LongArray(size) val coresCol = IntArray(size) for (i in fragments.indices) { val fragment = fragments[i] usageCol[i] = fragment.usage - timestampCol[i] = fragment.timestamp deadlineCol[i] = fragment.timestamp + fragment.duration coresCol[i] = fragment.cores } - return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + return SimTrace(usageCol, deadlineCol, coresCol, size) } /** @@ -108,25 +101,9 @@ public class SimTrace( * * @param cpu The [ProcessingUnit] for which to create the source. * @param offset The time offset to use for the trace. - * @param fillMode The [FillMode] for filling missing data. */ - public fun newSource(cpu: ProcessingUnit, offset: Long, fillMode: FillMode = FillMode.None): FlowSource { - return CpuConsumer(cpu, offset, fillMode, usageCol, timestampCol, deadlineCol, coresCol, size) - } - - /** - * An enumeration describing the modes for filling missing data. - */ - public enum class FillMode { - /** - * When a gap in the trace data occurs, the CPU usage will be set to zero. - */ - None, - - /** - * When a gap in the trace data occurs, the previous CPU usage will be used. - */ - Previous + public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource { + return CpuConsumer(cpu, offset, usageCol, deadlineCol, coresCol, size) } /** @@ -137,7 +114,6 @@ public class SimTrace( * The columns of the trace. */ private var usageCol: DoubleArray = DoubleArray(16) - private var timestampCol: LongArray = LongArray(16) private var deadlineCol: LongArray = LongArray(16) private var coresCol: IntArray = IntArray(16) @@ -150,25 +126,23 @@ public class SimTrace( * Add the specified [SimTraceFragment] to the trace. */ public fun add(fragment: SimTraceFragment) { - add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores) + add(fragment.timestamp + fragment.duration, fragment.usage, fragment.cores) } /** * Add a fragment to the trace. * - * @param timestamp Timestamp at which the fragment starts (in epoch millis). * @param deadline Timestamp at which the fragment ends (in epoch millis). * @param usage CPU usage of this fragment. * @param cores Number of cores used. */ - public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + public fun add(deadline: Long, usage: Double, cores: Int) { val size = size if (size == usageCol.size) { grow() } - timestampCol[size] = timestamp deadlineCol[size] = deadline usageCol[size] = usage coresCol[size] = cores @@ -184,7 +158,6 @@ public class SimTrace( val newSize = arraySize + (arraySize shr 1) usageCol = usageCol.copyOf(newSize) - timestampCol = timestampCol.copyOf(newSize) deadlineCol = deadlineCol.copyOf(newSize) coresCol = coresCol.copyOf(newSize) } @@ -193,7 +166,7 @@ public class SimTrace( * Construct the immutable [SimTrace]. */ public fun build(): SimTrace { - return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + return SimTrace(usageCol, deadlineCol, coresCol, size) } } @@ -203,9 +176,7 @@ public class SimTrace( private class CpuConsumer( cpu: ProcessingUnit, private val offset: Long, - private val fillMode: FillMode, private val usageCol: DoubleArray, - private val timestampCol: LongArray, private val deadlineCol: LongArray, private val coresCol: IntArray, private val size: Int @@ -236,16 +207,6 @@ public class SimTrace( } _idx = idx - val timestamp = timestampCol[idx] - - // There is a gap in the trace, since the next fragment starts in the future. - if (timestamp > nowOffset) { - when (fillMode) { - FillMode.None -> conn.push(0.0) // Reset rate to zero - FillMode.Previous -> {} // Keep previous rate - } - return timestamp - nowOffset - } val cores = min(coreCount, coresCol[idx]) val usage = usageCol[idx] diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 91855e8d..ddf8cf14 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -43,11 +43,12 @@ import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory +import java.util.* /** * Test suite for the [SimHypervisor] class. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimFairShareHypervisorTest { private lateinit var model: MachineModel @@ -76,9 +77,9 @@ internal class SimFairShareHypervisorTest { ), ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), PerformanceScalingGovernor()) launch { machine.runWorkload(hypervisor) @@ -125,11 +126,9 @@ internal class SimFairShareHypervisorTest { ) ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimFairShareHypervisor(platform, null, null, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) @@ -166,9 +165,9 @@ internal class SimFairShareHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(platform, null, null, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) assertDoesNotThrow { launch { @@ -193,11 +192,9 @@ internal class SimFairShareHypervisorTest { .addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n")) .build() - val platform = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimFairShareHypervisor(platform, null, null, interferenceModel.newDomain()) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null) val duration = 5 * 60L val workloadA = @@ -225,12 +222,12 @@ internal class SimFairShareHypervisorTest { coroutineScope { launch { - val vm = hypervisor.newMachine(model, "a") - vm.runWorkload(workloadA) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadA, meta = mapOf("interference-model" to interferenceModel.getProfile("a")!!)) hypervisor.removeMachine(vm) } - val vm = hypervisor.newMachine(model, "b") - vm.runWorkload(workloadB) + val vm = hypervisor.newMachine(model) + vm.runWorkload(workloadB, meta = mapOf("interference-model" to interferenceModel.getProfile("b")!!)) hypervisor.removeMachine(vm) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 823a0ae3..df6755f1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute.kernel -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.* @@ -40,11 +39,12 @@ import org.opendc.simulator.compute.runWorkload import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexerFactory +import java.util.* /** - * A test suite for the [SimSpaceSharedHypervisor]. + * A test suite for a space-shared [SimHypervisor]. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimSpaceSharedHypervisorTest { private lateinit var machineModel: MachineModel @@ -75,7 +75,7 @@ internal class SimSpaceSharedHypervisorTest { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } val vm = hypervisor.newMachine(machineModel) @@ -97,7 +97,7 @@ internal class SimSpaceSharedHypervisorTest { val workload = SimRuntimeWorkload(duration) val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -118,10 +118,8 @@ internal class SimSpaceSharedHypervisorTest { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -139,10 +137,8 @@ internal class SimSpaceSharedHypervisorTest { fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 val engine = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -169,7 +165,7 @@ internal class SimSpaceSharedHypervisorTest { fun testConcurrentWorkloadFails() = runBlockingSimulation { val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) - val hypervisor = SimSpaceSharedHypervisor(engine, null, null) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() @@ -189,11 +185,9 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { - val interpreter = FlowEngine(coroutineContext, clock) - val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null) launch { machine.runWorkload(hypervisor) } yield() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index a717ae6e..d8ad7978 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -42,11 +42,6 @@ public interface FlowCounters { public val remaining: Double /** - * The accumulated flow lost due to interference between sources. - */ - public val interference: Double - - /** * Reset the flow counters. */ public fun reset() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 0ad18f6a..6fa2971a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -258,6 +258,6 @@ public class FlowForwarder( val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.increment(work, actualWork, (total - actualWork), 0.0) + counters.increment(work, actualWork, (total - actualWork)) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index d0324ce8..ee8cd739 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -154,7 +154,7 @@ public class FlowSink( val work = capacity * deltaS val actualWork = ctx.rate * deltaS - counters.increment(work, actualWork, (total - actualWork), 0.0) + counters.increment(work, actualWork, (total - actualWork)) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt deleted file mode 100644 index aa2713b6..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt +++ /dev/null @@ -1,19 +0,0 @@ -package org.opendc.simulator.flow.interference - -import org.opendc.simulator.flow.FlowSource - -/** - * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur - * performance variability due to operating on the same resource and therefore causing interference. - */ -public interface InterferenceDomain { - /** - * Compute the performance score of a participant in this interference domain. - * - * @param key The participant to obtain the score of or `null` if the participant has no key. - * @param load The overall load on the interference domain. - * @return A score representing the performance score to be applied to the resource consumer, with 1 - * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. - */ - public fun apply(key: InterferenceKey?, load: Double): Double -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt deleted file mode 100644 index d28ebde5..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.interference - -/** - * A key that uniquely identifies a participant of an interference domain. - */ -public interface InterferenceKey diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt index d990dc61..c320a362 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -34,23 +34,20 @@ public class MutableFlowCounters : FlowCounters { get() = _counters[1] override val remaining: Double get() = _counters[2] - override val interference: Double - get() = _counters[3] - private val _counters = DoubleArray(4) + private val _counters = DoubleArray(3) override fun reset() { _counters.fill(0.0) } - public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + public fun increment(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters[0] += demand counters[1] += actual counters[2] += remaining - counters[3] += interference } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 5f198944..8752c559 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -25,13 +25,22 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.FlowConsumer import org.opendc.simulator.flow.FlowCounters import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.interference.InterferenceKey /** * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. */ public interface FlowMultiplexer { /** + * The maximum number of inputs supported by the multiplexer. + */ + public val maxInputs: Int + + /** + * The maximum number of outputs supported by the multiplexer. + */ + public val maxOutputs: Int + + /** * The inputs of the multiplexer that can be used to consume sources. */ public val inputs: Set<FlowConsumer> @@ -63,18 +72,15 @@ public interface FlowMultiplexer { /** * Create a new input on this multiplexer with a coupled capacity. - * - * @param key The key of the interference member to which the input belongs. */ - public fun newInput(key: InterferenceKey? = null): FlowConsumer + public fun newInput(): FlowConsumer /** * Create a new input on this multiplexer with the specified [capacity]. * * @param capacity The capacity of the input. - * @param key The key of the interference member to which the input belongs. */ - public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer + public fun newInput(capacity: Double): FlowConsumer /** * Remove [input] from this multiplexer. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt new file mode 100644 index 00000000..a863e3ad --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.FlowConvergenceListener +import org.opendc.simulator.flow.FlowEngine + +/** + * Factory interface for a [FlowMultiplexer] implementation. + */ +public fun interface FlowMultiplexerFactory { + /** + * Construct a new [FlowMultiplexer] using the specified [engine] and [listener]. + */ + public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer + + public companion object { + /** + * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer]. + */ + private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) } + + /** + * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer]. + */ + private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) } + + /** + * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances. + */ + @JvmStatic + public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY + + /** + * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances. + */ + @JvmStatic + public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 1d7d22ef..c50e9bbc 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque /** @@ -38,6 +37,12 @@ public class ForwardingFlowMultiplexer( private val engine: FlowEngine, private val listener: FlowConvergenceListener? = null ) : FlowMultiplexer, FlowConvergenceListener { + + override val maxInputs: Int + get() = _outputs.size + + override val maxOutputs: Int = Int.MAX_VALUE + override val inputs: Set<FlowConsumer> get() = _inputs private val _inputs = mutableSetOf<Input>() @@ -54,8 +59,6 @@ public class ForwardingFlowMultiplexer( get() = _outputs.sumOf { it.forwarder.counters.actual } override val remaining: Double get() = _outputs.sumOf { it.forwarder.counters.remaining } - override val interference: Double - get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { for (output in _outputs) { @@ -75,14 +78,14 @@ public class ForwardingFlowMultiplexer( override val capacity: Double get() = _outputs.sumOf { it.forwarder.capacity } - override fun newInput(key: InterferenceKey?): FlowConsumer { + override fun newInput(): FlowConsumer { val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } val input = Input(output) _inputs += input return input } - override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key) + override fun newInput(capacity: Double): FlowConsumer = newInput() override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index cc831862..f2a4c1a4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -23,11 +23,8 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceDomain -import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.max import kotlin.math.min /** @@ -35,13 +32,16 @@ import kotlin.math.min * * @param engine The [FlowEngine] to drive the flow simulation. * @param parent The parent flow system of the multiplexer. - * @param interferenceDomain The interference domain of the multiplexer. */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - parent: FlowConvergenceListener? = null, - private val interferenceDomain: InterferenceDomain? = null + parent: FlowConvergenceListener? = null ) : FlowMultiplexer { + + override val maxInputs: Int = Int.MAX_VALUE + + override val maxOutputs: Int = Int.MAX_VALUE + /** * The inputs of the multiplexer. */ @@ -85,16 +85,16 @@ public class MaxMinFlowMultiplexer( */ private val scheduler = Scheduler(engine, parent) - override fun newInput(key: InterferenceKey?): FlowConsumer { - return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key) + override fun newInput(): FlowConsumer { + return newInput(isCoupled = true, Double.POSITIVE_INFINITY) } - override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer { - return newInput(isCoupled = false, capacity, key) + override fun newInput(capacity: Double): FlowConsumer { + return newInput(isCoupled = false, capacity) } - private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity) + private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer { + val provider = Input(engine, scheduler, isCoupled, initialCapacity) _inputs.add(provider) return provider } @@ -499,8 +499,7 @@ public class MaxMinFlowMultiplexer( counters.increment( demand = demand * deltaS, actual = rate * deltaS, - remaining = (previousCapacity - rate) * deltaS, - interference = 0.0 + remaining = (previousCapacity - rate) * deltaS ) } } @@ -511,8 +510,6 @@ public class MaxMinFlowMultiplexer( private class Input( private val engine: FlowEngine, private val scheduler: Scheduler, - private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey?, @JvmField val isCoupled: Boolean, initialCapacity: Double, ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { @@ -693,24 +690,15 @@ public class MaxMinFlowMultiplexer( return } - // Compute the performance penalty due to flow interference - val perfScore = if (interferenceDomain != null) { - val load = scheduler.rate / scheduler.capacity - interferenceDomain.apply(key, load) - } else { - 1.0 - } - val actualRate = actualRate val deltaS = delta * D_MS_TO_S val demand = limit * deltaS val actual = actualRate * deltaS val remaining = (_capacity - actualRate) * deltaS - val interference = actual * max(0.0, 1 - perfScore) - _counters.increment(demand, actual, remaining, interference) - scheduler.counters.increment(0.0, 0.0, 0.0, interference) + _counters.increment(demand, actual, remaining) + scheduler.counters.increment(0.0, 0.0, 0.0) } } |
