summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt27
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt379
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt56
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt42
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt384
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt51
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt45
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt42
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt114
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt163
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt250
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt51
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt18
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt53
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt39
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt32
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt5
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt18
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt60
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt13
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt42
29 files changed, 869 insertions, 1094 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 91e91f9d..797d424e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -22,11 +22,9 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
-import org.opendc.simulator.compute.kernel.SimFairShareHypervisor
-import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisor
+import org.opendc.simulator.compute.kernel.SimHypervisor
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -37,7 +35,9 @@ import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
import org.openjdk.jmh.annotations.*
+import java.util.SplittableRandom
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
@@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit
@Fork(1)
@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
-@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var machineModel: MachineModel
private lateinit var trace: SimTrace
@@ -64,7 +63,7 @@ class SimMachineBenchmarks {
repeat(10000) {
val timestamp = it.toLong()
val deadline = timestamp + 1000
- builder.add(timestamp, deadline, random.nextDouble(0.0, 4500.0), 1)
+ builder.add(deadline, random.nextDouble(0.0, 4500.0), 1)
}
trace = builder.build()
}
@@ -84,10 +83,8 @@ class SimMachineBenchmarks {
fun benchmarkSpaceSharedHypervisor() {
return runBlockingSimulation {
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
@@ -106,10 +103,8 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorSingle() {
return runBlockingSimulation {
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimFairShareHypervisor(engine, null, null, null)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
@@ -128,10 +123,8 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorDouble() {
return runBlockingSimulation {
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimFairShareHypervisor(engine, null, null, null)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index e14ea507..ef0cd323 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -36,12 +36,10 @@ import org.opendc.simulator.flow.*
* Abstract implementation of the [SimMachine] interface.
*
* @param engine The engine to manage the machine's resources.
- * @param parent The parent simulation system.
* @param model The model of the machine.
*/
public abstract class SimAbstractMachine(
protected val engine: FlowEngine,
- private val parent: FlowConvergenceListener?,
final override val model: MachineModel
) : SimMachine, FlowConvergenceListener {
/**
@@ -86,9 +84,7 @@ public abstract class SimAbstractMachine(
_ctx?.close()
}
- override fun onConverge(now: Long) {
- parent?.onConverge(now)
- }
+ override fun onConverge(now: Long) {}
/**
* The execution context in which the workload runs.
@@ -105,7 +101,7 @@ public abstract class SimAbstractMachine(
*/
private var isClosed = false
- override val engine: FlowEngine = this@SimAbstractMachine.engine
+ val engine: FlowEngine = this@SimAbstractMachine.engine
/**
* Start this context.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 63a82e78..0df897b6 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -40,15 +40,13 @@ import kotlin.math.max
* @param model The machine model to simulate.
* @param powerDriver The power driver to use.
* @param psu The power supply of the machine.
- * @param parent The parent simulation system.
*/
public class SimBareMetalMachine(
engine: FlowEngine,
model: MachineModel,
powerDriver: PowerDriver,
public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)),
- parent: FlowConvergenceListener? = null,
-) : SimAbstractMachine(engine, parent, model) {
+) : SimAbstractMachine(engine, model) {
/**
* The current power usage of the machine (without PSU loss) in W.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index 1317f728..5e3a7766 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.flow.FlowEngine
-
/**
* A simulated execution context in which a bootable image runs. This interface represents the
* firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on
@@ -31,11 +29,6 @@ import org.opendc.simulator.flow.FlowEngine
*/
public interface SimMachineContext : AutoCloseable {
/**
- * The [FlowEngine] that simulates the machine.
- */
- public val engine: FlowEngine
-
- /**
* The metadata associated with the context.
*/
public val meta: Map<String, Any>
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
deleted file mode 100644
index 8e925bdf..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.kernel
-
-import org.opendc.simulator.compute.*
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.flow.*
-import org.opendc.simulator.flow.interference.InterferenceKey
-import org.opendc.simulator.flow.mux.FlowMultiplexer
-import kotlin.math.roundToLong
-
-/**
- * Abstract implementation of the [SimHypervisor] interface.
- *
- * @param engine The [FlowEngine] to drive the simulation.
- * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware.
- */
-public abstract class SimAbstractHypervisor(
- protected val engine: FlowEngine,
- private val listener: FlowConvergenceListener?,
- private val scalingGovernor: ScalingGovernor?,
- protected val interferenceDomain: VmInterferenceDomain? = null
-) : SimHypervisor, FlowConvergenceListener {
- /**
- * The machine on which the hypervisor runs.
- */
- protected lateinit var context: SimMachineContext
-
- /**
- * The resource switch to use.
- */
- protected abstract val mux: FlowMultiplexer
-
- /**
- * The virtual machines running on this hypervisor.
- */
- private val _vms = mutableSetOf<VirtualMachine>()
- override val vms: Set<SimMachine>
- get() = _vms
-
- /**
- * The resource counters associated with the hypervisor.
- */
- public override val counters: SimHypervisorCounters
- get() = _counters
- private val _counters = CountersImpl(this)
-
- /**
- * The CPU capacity of the hypervisor in MHz.
- */
- override val cpuCapacity: Double
- get() = mux.capacity
-
- /**
- * The CPU demand of the hypervisor in MHz.
- */
- override val cpuDemand: Double
- get() = mux.demand
-
- /**
- * The CPU usage of the hypervisor in MHz.
- */
- override val cpuUsage: Double
- get() = mux.rate
-
- /**
- * The scaling governors attached to the physical CPUs backing this hypervisor.
- */
- private val governors = mutableListOf<ScalingGovernor.Logic>()
-
- /* SimHypervisor */
- override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine {
- require(canFit(model)) { "Machine does not fit" }
- val vm = VirtualMachine(model, interferenceId)
- _vms.add(vm)
- return vm
- }
-
- override fun removeMachine(machine: SimVirtualMachine) {
- if (_vms.remove(machine)) {
- // This cast must always succeed, since `_vms` only contains `VirtualMachine` types.
- (machine as VirtualMachine).close()
- }
- }
-
- /* SimWorkload */
- override fun onStart(ctx: SimMachineContext) {
- context = ctx
-
- _cpuCount = ctx.cpus.size
- _cpuCapacity = ctx.cpus.sumOf { it.model.frequency }
- _counters.d = _cpuCount / _cpuCapacity * 1000L
-
- // Clear the existing outputs of the multiplexer
- mux.clearOutputs()
-
- for (cpu in ctx.cpus) {
- val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu))
- if (governor != null) {
- governors.add(governor)
- governor.onStart()
- }
-
- cpu.startConsumer(mux.newOutput())
- }
- }
-
- override fun onStop(ctx: SimMachineContext) {}
-
- private var _cpuCount = 0
- private var _cpuCapacity = 0.0
- private var _lastConverge = engine.clock.millis()
-
- /* FlowConvergenceListener */
- override fun onConverge(now: Long) {
- val lastConverge = _lastConverge
- _lastConverge = now
- val delta = now - lastConverge
-
- if (delta > 0) {
- _counters.record()
- }
-
- val load = cpuDemand / cpuCapacity
- for (governor in governors) {
- governor.onLimit(load)
- }
-
- listener?.onConverge(now)
- }
-
- /**
- * A virtual machine running on the hypervisor.
- *
- * @param model The machine model of the virtual machine.
- */
- private inner class VirtualMachine(
- model: MachineModel,
- interferenceId: String? = null
- ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable {
- /**
- * A flag to indicate that the machine is closed.
- */
- private var isClosed = false
-
- /**
- * The interference key of this virtual machine.
- */
- private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) }
-
- /**
- * The vCPUs of the machine.
- */
- override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) }
-
- /**
- * The resource counters associated with the hypervisor.
- */
- override val counters: SimHypervisorCounters
- get() = _counters
- private val _counters = VmCountersImpl(cpus)
-
- /**
- * The CPU capacity of the hypervisor in MHz.
- */
- override val cpuCapacity: Double
- get() = cpus.sumOf(FlowConsumer::capacity)
-
- /**
- * The CPU demand of the hypervisor in MHz.
- */
- override val cpuDemand: Double
- get() = cpus.sumOf(FlowConsumer::demand)
-
- /**
- * The CPU usage of the hypervisor in MHz.
- */
- override val cpuUsage: Double
- get() = cpus.sumOf(FlowConsumer::rate)
-
- override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
- check(!isClosed) { "Machine is closed" }
-
- return super.startWorkload(
- object : SimWorkload {
- override fun onStart(ctx: SimMachineContext) {
- try {
- joinInterferenceDomain()
- workload.onStart(ctx)
- } catch (cause: Throwable) {
- leaveInterferenceDomain()
- throw cause
- }
- }
-
- override fun onStop(ctx: SimMachineContext) {
- leaveInterferenceDomain()
- workload.onStop(ctx)
- }
- },
- meta
- )
- }
-
- override fun close() {
- if (isClosed) {
- return
- }
-
- isClosed = true
- cancel()
-
- for (cpu in cpus) {
- cpu.close()
- }
- }
-
- /**
- * Join the interference domain of the hypervisor.
- */
- private fun joinInterferenceDomain() {
- val interferenceKey = interferenceKey
- if (interferenceKey != null) {
- interferenceDomain?.join(interferenceKey)
- }
- }
-
- /**
- * Leave the interference domain of the hypervisor.
- */
- private fun leaveInterferenceDomain() {
- val interferenceKey = interferenceKey
- if (interferenceKey != null) {
- interferenceDomain?.leave(interferenceKey)
- }
- }
- }
-
- /**
- * A [SimProcessingUnit] of a virtual machine.
- */
- private class VCpu(
- private val switch: FlowMultiplexer,
- private val source: FlowConsumer,
- override val model: ProcessingUnit
- ) : SimProcessingUnit, FlowConsumer by source {
- override var capacity: Double
- get() = source.capacity
- set(_) = TODO("Capacity changes on vCPU not supported")
-
- override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
-
- /**
- * Close the CPU
- */
- fun close() {
- switch.removeInput(source)
- }
-
- fun flush() {
- switch.flushCounters(source)
- }
- }
-
- /**
- * A [ScalingPolicy] for a physical CPU of the hypervisor.
- */
- private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy {
- override var target: Double
- get() = cpu.capacity
- set(value) {
- cpu.capacity = value
- }
-
- override val max: Double = cpu.model.frequency
-
- override val min: Double = 0.0
- }
-
- /**
- * Implementation of [SimHypervisorCounters].
- */
- private class CountersImpl(private val hv: SimAbstractHypervisor) : SimHypervisorCounters {
- @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity
-
- override val cpuActiveTime: Long
- get() = _cpuTime[0]
- override val cpuIdleTime: Long
- get() = _cpuTime[1]
- override val cpuStealTime: Long
- get() = _cpuTime[2]
- override val cpuLostTime: Long
- get() = _cpuTime[3]
-
- private val _cpuTime = LongArray(4)
- private val _previous = DoubleArray(4)
-
- /**
- * Record the CPU time of the hypervisor.
- */
- fun record() {
- val cpuTime = _cpuTime
- val previous = _previous
- val counters = hv.mux.counters
-
- val demand = counters.demand
- val actual = counters.actual
- val remaining = counters.remaining
- val interference = counters.interference
-
- val demandDelta = demand - previous[0]
- val actualDelta = actual - previous[1]
- val remainingDelta = remaining - previous[2]
- val interferenceDelta = interference - previous[3]
-
- previous[0] = demand
- previous[1] = actual
- previous[2] = remaining
- previous[3] = interference
-
- cpuTime[0] += (actualDelta * d).roundToLong()
- cpuTime[1] += (remainingDelta * d).roundToLong()
- cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong()
- cpuTime[3] += (interferenceDelta * d).roundToLong()
- }
-
- override fun flush() {
- hv.mux.flushCounters()
- record()
- }
- }
-
- /**
- * A [SimHypervisorCounters] implementation for a virtual machine.
- */
- private class VmCountersImpl(private val cpus: List<VCpu>) : SimHypervisorCounters {
- private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
-
- override val cpuActiveTime: Long
- get() = (cpus.sumOf { it.counters.actual } * d).roundToLong()
- override val cpuIdleTime: Long
- get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong()
- override val cpuStealTime: Long
- get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
- override val cpuLostTime: Long
- get() = (cpus.sumOf { it.counters.interference } * d).roundToLong()
-
- override fun flush() {
- for (cpu in cpus) {
- cpu.flush()
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
deleted file mode 100644
index 36f76650..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.kernel
-
-import org.opendc.simulator.compute.SimMachine
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.mux.FlowMultiplexer
-import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
-
-/**
- * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine]
- * concurrently using weighted fair sharing.
- *
- * @param engine The [FlowEngine] to manage the machine's resources.
- * @param listener The listener for the convergence of the system.
- * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
- * @param interferenceDomain The resource interference domain to which the hypervisor belongs.
- */
-public class SimFairShareHypervisor(
- engine: FlowEngine,
- listener: FlowConvergenceListener?,
- scalingGovernor: ScalingGovernor?,
- interferenceDomain: VmInterferenceDomain?,
-) : SimAbstractHypervisor(engine, listener, scalingGovernor, interferenceDomain) {
- /**
- * The multiplexer that distributes the computing capacity.
- */
- override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
-
- override fun canFit(model: MachineModel): Boolean = true
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
deleted file mode 100644
index 3136f4c8..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.kernel
-
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-
-/**
- * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
- */
-public class SimFairShareHypervisorProvider : SimHypervisorProvider {
- override val id: String = "fair-share"
-
- override fun create(
- engine: FlowEngine,
- listener: FlowConvergenceListener?,
- scalingGovernor: ScalingGovernor?,
- interferenceDomain: VmInterferenceDomain?,
- ): SimHypervisor = SimFairShareHypervisor(engine, listener, scalingGovernor, interferenceDomain)
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index a69f419f..7594bf4d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -22,57 +22,415 @@
package org.opendc.simulator.compute.kernel
-import org.opendc.simulator.compute.SimMachine
+import org.opendc.simulator.compute.*
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
+import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceMember
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile
import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.FlowMultiplexer
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
+import java.util.SplittableRandom
+import kotlin.math.roundToLong
/**
* A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
* to another [SimMachine].
+ *
+ * @param engine The [FlowEngine] to drive the simulation.
+ * @param muxFactory The factor for the [FlowMultiplexer] to multiplex the workloads.
+ * @param random A randomness generator for the interference calculations.
+ * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware.
+ * @param interferenceDomain The interference domain to which the hypervisor belongs.
*/
-public interface SimHypervisor : SimWorkload {
+public class SimHypervisor(
+ private val engine: FlowEngine,
+ muxFactory: FlowMultiplexerFactory,
+ private val random: SplittableRandom,
+ private val scalingGovernor: ScalingGovernor? = null,
+ private val interferenceDomain: VmInterferenceDomain = VmInterferenceDomain()
+) : SimWorkload, FlowConvergenceListener {
+ /**
+ * The [FlowMultiplexer] to multiplex the virtual machines.
+ */
+ private val mux = muxFactory.newMultiplexer(engine, this)
+
/**
- * The machines running on the hypervisor.
+ * The virtual machines running on this hypervisor.
*/
+ private val _vms = mutableSetOf<VirtualMachine>()
public val vms: Set<SimMachine>
+ get() = _vms
/**
* The resource counters associated with the hypervisor.
*/
public val counters: SimHypervisorCounters
+ get() = _counters
+ private val _counters = CountersImpl(this)
/**
- * The CPU usage of the hypervisor in MHz.
+ * The CPU capacity of the hypervisor in MHz.
*/
- public val cpuUsage: Double
+ public val cpuCapacity: Double
+ get() = mux.capacity
/**
- * The CPU usage of the hypervisor in MHz.
+ * The CPU demand of the hypervisor in MHz.
*/
public val cpuDemand: Double
+ get() = mux.demand
/**
- * The CPU capacity of the hypervisor in MHz.
+ * The CPU usage of the hypervisor in MHz.
*/
- public val cpuCapacity: Double
+ public val cpuUsage: Double
+ get() = mux.rate
/**
- * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
+ * The machine on which the hypervisor runs.
+ */
+ private lateinit var context: SimMachineContext
+
+ /**
+ * The scaling governors attached to the physical CPUs backing this hypervisor.
*/
- public fun canFit(model: MachineModel): Boolean
+ private val governors = mutableListOf<ScalingGovernor.Logic>()
+ /* SimHypervisor */
/**
* Create a [SimMachine] instance on which users may run a [SimWorkload].
*
* @param model The machine to create.
- * @param interferenceId An identifier for the interference model.
*/
- public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine
+ public fun newMachine(model: MachineModel): SimVirtualMachine {
+ require(canFit(model)) { "Machine does not fit" }
+ val vm = VirtualMachine(model)
+ _vms.add(vm)
+ return vm
+ }
/**
* Remove the specified [machine] from the hypervisor.
*
* @param machine The machine to remove.
*/
- public fun removeMachine(machine: SimVirtualMachine)
+ public fun removeMachine(machine: SimVirtualMachine) {
+ if (_vms.remove(machine)) {
+ // This cast must always succeed, since `_vms` only contains `VirtualMachine` types.
+ (machine as VirtualMachine).close()
+ }
+ }
+
+ /**
+ * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
+ */
+ public fun canFit(model: MachineModel): Boolean {
+ return (mux.maxInputs - mux.inputs.size) >= model.cpus.size
+ }
+
+ /* SimWorkload */
+ override fun onStart(ctx: SimMachineContext) {
+ context = ctx
+
+ _cpuCount = ctx.cpus.size
+ _cpuCapacity = ctx.cpus.sumOf { it.model.frequency }
+ _counters.d = _cpuCount / _cpuCapacity * 1000L
+
+ // Clear the existing outputs of the multiplexer
+ mux.clearOutputs()
+
+ for (cpu in ctx.cpus) {
+ val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu))
+ if (governor != null) {
+ governors.add(governor)
+ governor.onStart()
+ }
+
+ cpu.startConsumer(mux.newOutput())
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {}
+
+ private var _cpuCount = 0
+ private var _cpuCapacity = 0.0
+ private var _lastConverge = engine.clock.millis()
+
+ /* FlowConvergenceListener */
+ override fun onConverge(now: Long) {
+ val lastConverge = _lastConverge
+ _lastConverge = now
+ val delta = now - lastConverge
+
+ if (delta > 0) {
+ _counters.record()
+
+ val mux = mux
+ val load = mux.rate / mux.capacity.coerceAtLeast(1.0)
+ val random = random
+
+ for (vm in _vms) {
+ vm._counters.record(random, load)
+ }
+ }
+
+ val load = cpuDemand / cpuCapacity
+ for (governor in governors) {
+ governor.onLimit(load)
+ }
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @param model The machine model of the virtual machine.
+ */
+ private inner class VirtualMachine(model: MachineModel) : SimAbstractMachine(engine, model), SimVirtualMachine, AutoCloseable {
+ /**
+ * A flag to indicate that the machine is closed.
+ */
+ private var isClosed = false
+
+ /**
+ * The vCPUs of the machine.
+ */
+ override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) }
+
+ /**
+ * The resource counters associated with the hypervisor.
+ */
+ override val counters: SimHypervisorCounters
+ get() = _counters
+ @JvmField val _counters = VmCountersImpl(cpus, null)
+
+ /**
+ * The CPU capacity of the hypervisor in MHz.
+ */
+ override val cpuCapacity: Double
+ get() = cpus.sumOf(FlowConsumer::capacity)
+
+ /**
+ * The CPU demand of the hypervisor in MHz.
+ */
+ override val cpuDemand: Double
+ get() = cpus.sumOf(FlowConsumer::demand)
+
+ /**
+ * The CPU usage of the hypervisor in MHz.
+ */
+ override val cpuUsage: Double
+ get() = cpus.sumOf(FlowConsumer::rate)
+
+ override fun startWorkload(workload: SimWorkload, meta: Map<String, Any>): SimMachineContext {
+ check(!isClosed) { "Machine is closed" }
+
+ val profile = meta["interference-profile"] as? VmInterferenceProfile
+ val interferenceMember = if (profile != null) interferenceDomain.join(profile) else null
+
+ val counters = _counters
+ counters.member = interferenceMember
+
+ return super.startWorkload(
+ object : SimWorkload {
+ override fun onStart(ctx: SimMachineContext) {
+ try {
+ interferenceMember?.activate()
+ workload.onStart(ctx)
+ } catch (cause: Throwable) {
+ interferenceMember?.deactivate()
+ throw cause
+ }
+ }
+
+ override fun onStop(ctx: SimMachineContext) {
+ interferenceMember?.deactivate()
+ counters.member = null
+ workload.onStop(ctx)
+ }
+ },
+ meta
+ )
+ }
+
+ override fun close() {
+ if (isClosed) {
+ return
+ }
+
+ isClosed = true
+ cancel()
+
+ for (cpu in cpus) {
+ cpu.close()
+ }
+ }
+ }
+
+ /**
+ * A [SimProcessingUnit] of a virtual machine.
+ */
+ private class VCpu(
+ private val switch: FlowMultiplexer,
+ private val source: FlowConsumer,
+ override val model: ProcessingUnit
+ ) : SimProcessingUnit, FlowConsumer by source {
+ override var capacity: Double
+ get() = source.capacity
+ set(_) = TODO("Capacity changes on vCPU not supported")
+
+ override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
+
+ /**
+ * Close the CPU
+ */
+ fun close() {
+ switch.removeInput(source)
+ }
+
+ fun flush() {
+ switch.flushCounters(source)
+ }
+ }
+
+ /**
+ * A [ScalingPolicy] for a physical CPU of the hypervisor.
+ */
+ private class ScalingPolicyImpl(override val cpu: SimProcessingUnit) : ScalingPolicy {
+ override var target: Double
+ get() = cpu.capacity
+ set(value) {
+ cpu.capacity = value
+ }
+
+ override val max: Double = cpu.model.frequency
+
+ override val min: Double = 0.0
+ }
+
+ /**
+ * Implementation of [SimHypervisorCounters].
+ */
+ private class CountersImpl(private val hv: SimHypervisor) : SimHypervisorCounters {
+ @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity
+
+ override val cpuActiveTime: Long
+ get() = _cpuTime[0]
+ override val cpuIdleTime: Long
+ get() = _cpuTime[1]
+ override val cpuStealTime: Long
+ get() = _cpuTime[2]
+ override val cpuLostTime: Long
+ get() = _cpuTime[3]
+
+ val _cpuTime = LongArray(4)
+ private val _previous = DoubleArray(3)
+
+ /**
+ * Record the CPU time of the hypervisor.
+ */
+ fun record() {
+ val cpuTime = _cpuTime
+ val previous = _previous
+ val counters = hv.mux.counters
+
+ val demand = counters.demand
+ val actual = counters.actual
+ val remaining = counters.remaining
+
+ val demandDelta = demand - previous[0]
+ val actualDelta = actual - previous[1]
+ val remainingDelta = remaining - previous[2]
+
+ previous[0] = demand
+ previous[1] = actual
+ previous[2] = remaining
+
+ cpuTime[0] += (actualDelta * d).roundToLong()
+ cpuTime[1] += (remainingDelta * d).roundToLong()
+ cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong()
+ }
+
+ override fun flush() {
+ hv.mux.flushCounters()
+ record()
+ }
+ }
+
+ /**
+ * A [SimHypervisorCounters] implementation for a virtual machine.
+ */
+ private inner class VmCountersImpl(
+ private val cpus: List<VCpu>,
+ @JvmField var member: VmInterferenceMember?
+ ) : SimHypervisorCounters {
+ private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000
+
+ override val cpuActiveTime: Long
+ get() = _cpuTime[0]
+ override val cpuIdleTime: Long
+ get() = _cpuTime[1]
+ override val cpuStealTime: Long
+ get() = _cpuTime[2]
+ override val cpuLostTime: Long
+ get() = _cpuTime[3]
+
+ private val _cpuTime = LongArray(4)
+ private val _previous = DoubleArray(3)
+
+ /**
+ * Record the CPU time of the hypervisor.
+ */
+ fun record(random: SplittableRandom, load: Double) {
+ val cpuTime = _cpuTime
+ val previous = _previous
+
+ var demand = 0.0
+ var actual = 0.0
+ var remaining = 0.0
+
+ for (cpu in cpus) {
+ val counters = cpu.counters
+
+ actual += counters.actual
+ demand += counters.demand
+ remaining += counters.remaining
+ }
+
+ val demandDelta = demand - previous[0]
+ val actualDelta = actual - previous[1]
+ val remainingDelta = remaining - previous[2]
+
+ previous[0] = demand
+ previous[1] = actual
+ previous[2] = remaining
+
+ val d = d
+ cpuTime[0] += (actualDelta * d).roundToLong()
+ cpuTime[1] += (remainingDelta * d).roundToLong()
+ cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong()
+
+ // Compute the performance penalty due to flow interference
+ val member = member
+ if (member != null) {
+ val penalty = 1 - member.apply(random, load)
+ val interference = (actualDelta * d * penalty).roundToLong()
+
+ if (interference > 0) {
+ cpuTime[3] += interference
+ _counters._cpuTime[3] += interference
+ }
+ }
+ }
+
+ override fun flush() {
+ for (cpu in cpus) {
+ cpu.flush()
+ }
+ }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
deleted file mode 100644
index 483217af..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.kernel
-
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-
-/**
- * A service provider interface for constructing a [SimHypervisor].
- */
-public interface SimHypervisorProvider {
- /**
- * A unique identifier for this hypervisor implementation.
- *
- * Each hypervisor must provide a unique ID, so that they can be selected by the user.
- * When in doubt, you may use the fully qualified name of your custom [SimHypervisor] implementation class.
- */
- public val id: String
-
- /**
- * Create a new [SimHypervisor] instance.
- */
- public fun create(
- engine: FlowEngine,
- listener: FlowConvergenceListener? = null,
- scalingGovernor: ScalingGovernor? = null,
- interferenceDomain: VmInterferenceDomain? = null,
- ): SimHypervisor
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
deleted file mode 100644
index 3f3bf6ad..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.kernel
-
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.mux.FlowMultiplexer
-import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
-
-/**
- * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
- */
-public class SimSpaceSharedHypervisor(
- engine: FlowEngine,
- listener: FlowConvergenceListener?,
- scalingGovernor: ScalingGovernor?,
-) : SimAbstractHypervisor(engine, listener, scalingGovernor) {
- override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this)
-
- override fun canFit(model: MachineModel): Boolean {
- return mux.outputs.size - mux.inputs.size >= model.cpus.size
- }
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
deleted file mode 100644
index dd6fb0b1..00000000
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.compute.kernel
-
-import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-
-/**
- * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
- */
-public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
- override val id: String = "space-shared"
-
- override fun create(
- engine: FlowEngine,
- listener: FlowConvergenceListener?,
- scalingGovernor: ScalingGovernor?,
- interferenceDomain: VmInterferenceDomain?,
- ): SimHypervisor = SimSpaceSharedHypervisor(engine, listener, scalingGovernor)
-}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
index 09b03306..6861823b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -22,38 +22,110 @@
package org.opendc.simulator.compute.kernel.interference
-import org.opendc.simulator.flow.interference.InterferenceDomain
-import org.opendc.simulator.flow.interference.InterferenceKey
+import java.util.ArrayDeque
+import java.util.ArrayList
+import java.util.WeakHashMap
/**
- * The interference domain of a hypervisor.
+ * A domain where virtual machines may incur performance variability due to operating on the same resource and
+ * therefore causing interference.
*/
-public interface VmInterferenceDomain : InterferenceDomain {
+public class VmInterferenceDomain {
/**
- * Construct an [InterferenceKey] for the specified [id].
- *
- * @param id The identifier of the virtual machine.
- * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine
- * does not participate in the domain.
+ * A cache to maintain a mapping between the active profiles in this domain.
*/
- public fun createKey(id: String): InterferenceKey?
+ private val cache = WeakHashMap<VmInterferenceProfile, VmInterferenceMember>()
/**
- * Remove the specified [key] from this domain.
+ * The set of members active in this domain.
*/
- public fun removeKey(key: InterferenceKey)
+ private val activeKeys = ArrayList<VmInterferenceMember>()
/**
- * Mark the specified [key] as active in this interference domain.
- *
- * @param key The key to join the interference domain with.
+ * Queue of participants that will be removed or added to the active groups.
*/
- public fun join(key: InterferenceKey)
+ private val participants = ArrayDeque<VmInterferenceMember>()
/**
- * Mark the specified [key] as inactive in this interference domain.
- *
- * @param key The key of the virtual machine that wants to leave the domain.
+ * Join this interference domain with the specified [profile] and return the [VmInterferenceMember] associated with
+ * the profile. If the member does not exist, it will be created.
*/
- public fun leave(key: InterferenceKey)
+ public fun join(profile: VmInterferenceProfile): VmInterferenceMember {
+ return cache.computeIfAbsent(profile) { key -> key.newMember(this) }
+ }
+
+ /**
+ * Mark the specified [member] as active in this interference domain.
+ */
+ internal fun activate(member: VmInterferenceMember) {
+ val activeKeys = activeKeys
+ val pos = activeKeys.binarySearch(member)
+ if (pos < 0) {
+ activeKeys.add(-pos - 1, member)
+ }
+
+ computeActiveGroups(activeKeys, member)
+ }
+
+ /**
+ * Mark the specified [member] as inactive in this interference domain.
+ */
+ internal fun deactivate(member: VmInterferenceMember) {
+ val activeKeys = activeKeys
+ activeKeys.remove(member)
+ computeActiveGroups(activeKeys, member)
+ }
+
+ /**
+ * (Re-)compute the active groups.
+ */
+ private fun computeActiveGroups(activeKeys: ArrayList<VmInterferenceMember>, member: VmInterferenceMember) {
+ if (activeKeys.isEmpty()) {
+ return
+ }
+
+ val groups = member.membership
+ val members = member.members
+ val participants = participants
+
+ for (group in groups) {
+ val groupMembers = members[group]
+
+ var i = 0
+ var j = 0
+ var intersection = 0
+
+ // Compute the intersection of the group members and the current active members
+ while (i < groupMembers.size && j < activeKeys.size) {
+ val l = groupMembers[i]
+ val rightEntry = activeKeys[j]
+ val r = rightEntry.id
+
+ if (l < r) {
+ i++
+ } else if (l > r) {
+ j++
+ } else {
+ if (++intersection > 1) {
+ rightEntry.addGroup(group)
+ } else {
+ participants.add(rightEntry)
+ }
+
+ i++
+ j++
+ }
+ }
+
+ while (true) {
+ val participant = participants.poll() ?: break
+
+ if (intersection <= 1) {
+ participant.removeGroup(group)
+ } else {
+ participant.addGroup(group)
+ }
+ }
+ }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt
new file mode 100644
index 00000000..762bb568
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceMember.kt
@@ -0,0 +1,163 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel.interference
+
+import java.util.*
+
+/**
+ * A participant of an interference domain.
+ */
+public class VmInterferenceMember(
+ private val domain: VmInterferenceDomain,
+ private val model: VmInterferenceModel,
+ @JvmField internal val id: Int,
+ @JvmField internal val membership: IntArray,
+ @JvmField internal val members: Array<IntArray>,
+ private val targets: DoubleArray,
+ private val scores: DoubleArray
+) : Comparable<VmInterferenceMember> {
+ /**
+ * The active groups to which the key belongs.
+ */
+ private var groups: IntArray = IntArray(2)
+ private var groupsSize: Int = 0
+
+ /**
+ * The number of users of the interference key.
+ */
+ private var refCount: Int = 0
+
+ /**
+ * Mark this member as active in this interference domain.
+ */
+ public fun activate() {
+ if (refCount++ <= 0) {
+ domain.activate(this)
+ }
+ }
+
+ /**
+ * Mark this member as inactive in this interference domain.
+ */
+ public fun deactivate() {
+ if (--refCount <= 0) {
+ domain.deactivate(this)
+ }
+ }
+
+ /**
+ * Compute the performance score of the member in this interference domain.
+ *
+ * @param random The source of randomness to apply when computing the performance score.
+ * @param load The overall load on the interference domain.
+ * @return A score representing the performance score to be applied to the member, with 1
+ * meaning no influence, <1 means that performance degrades, and >1 means that performance improves.
+ */
+ public fun apply(random: SplittableRandom, load: Double): Double {
+ val groupsSize = groupsSize
+
+ if (groupsSize == 0) {
+ return 1.0
+ }
+
+ val groups = groups
+ val targets = targets
+
+ var low = 0
+ var high = groupsSize - 1
+ var group = -1
+
+ // Perform binary search over the groups based on target load
+ while (low <= high) {
+ val mid = low + high ushr 1
+ val midGroup = groups[mid]
+ val target = targets[midGroup]
+
+ if (target < load) {
+ low = mid + 1
+ group = midGroup
+ } else if (target > load) {
+ high = mid - 1
+ } else {
+ group = midGroup
+ break
+ }
+ }
+
+ return if (group >= 0 && random.nextInt(members[group].size) == 0) {
+ scores[group]
+ } else {
+ 1.0
+ }
+ }
+
+ /**
+ * Add an active group to this member.
+ */
+ internal fun addGroup(group: Int) {
+ var groups = groups
+ val groupsSize = groupsSize
+ val pos = groups.binarySearch(group, toIndex = groupsSize)
+
+ if (pos >= 0) {
+ return
+ }
+
+ val idx = -pos - 1
+
+ if (groups.size == groupsSize) {
+ val newSize = groupsSize + (groupsSize shr 1)
+ groups = groups.copyOf(newSize)
+ this.groups = groups
+ }
+
+ groups.copyInto(groups, idx + 1, idx, groupsSize)
+ groups[idx] = group
+ this.groupsSize += 1
+ }
+
+ /**
+ * Remove an active group from this member.
+ */
+ internal fun removeGroup(group: Int) {
+ val groups = groups
+ val groupsSize = groupsSize
+ val pos = groups.binarySearch(group, toIndex = groupsSize)
+
+ if (pos < 0) {
+ return
+ }
+
+ groups.copyInto(groups, pos, pos + 1, groupsSize)
+ this.groupsSize -= 1
+ }
+
+ override fun compareTo(other: VmInterferenceMember): Int {
+ val cmp = model.hashCode().compareTo(other.model.hashCode())
+ if (cmp != 0) {
+ return cmp
+ }
+
+ return id.compareTo(other.id)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
index 977292be..018c6e3d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
@@ -22,43 +22,35 @@
package org.opendc.simulator.compute.kernel.interference
-import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.*
/**
* An interference model that models the resource interference between virtual machines on a host.
*
- * @param targets The target load of each group.
+ * @param members The target load of each group.
* @param scores The performance score of each group.
* @param members The members belonging to each group.
* @param membership The identifier of each key.
* @param size The number of groups.
- * @param seed The seed to use for randomly selecting the virtual machines that are affected.
*/
public class VmInterferenceModel private constructor(
- private val targets: DoubleArray,
- private val scores: DoubleArray,
private val idMapping: Map<String, Int>,
private val members: Array<IntArray>,
private val membership: Array<IntArray>,
- private val size: Int,
- seed: Long,
+ private val targets: DoubleArray,
+ private val scores: DoubleArray,
+ private val size: Int
) {
/**
- * A [SplittableRandom] used for selecting the virtual machines that are affected.
- */
- private val random = SplittableRandom(seed)
-
- /**
- * Construct a new [VmInterferenceDomain].
- */
- public fun newDomain(): VmInterferenceDomain = InterferenceDomainImpl(targets, scores, idMapping, members, membership, random)
-
- /**
- * Create a copy of this model with a different seed.
+ * Return the [VmInterferenceProfile] associated with the specified [id].
+ *
+ * @param id The identifier of the virtual machine.
+ * @return A [VmInterferenceProfile] representing the virtual machine as part of interference model or `null` if
+ * there is no profile for the virtual machine.
*/
- public fun withSeed(seed: Long): VmInterferenceModel {
- return VmInterferenceModel(targets, scores, idMapping, members, membership, size, seed)
+ public fun getProfile(id: String): VmInterferenceProfile? {
+ val intId = idMapping[id] ?: return null
+ return VmInterferenceProfile(this, intId, membership[intId], members, targets, scores)
}
public companion object {
@@ -74,11 +66,6 @@ public class VmInterferenceModel private constructor(
*/
public class Builder internal constructor() {
/**
- * The initial capacity of the builder.
- */
- private val INITIAL_CAPACITY = 256
-
- /**
* The target load of each group.
*/
private var _targets = DoubleArray(INITIAL_CAPACITY) { Double.POSITIVE_INFINITY }
@@ -126,14 +113,14 @@ public class VmInterferenceModel private constructor(
/**
* Build the [VmInterferenceModel].
*/
- public fun build(seed: Long = 0): VmInterferenceModel {
+ public fun build(): VmInterferenceModel {
val size = size
val targets = _targets
val scores = _scores
val members = _members
- val indices = Array(size) { it }
- indices.sortWith(
+ val indices = IntArray(size) { it }
+ indices.sortedWith(
Comparator { l, r ->
var cmp = targets[l].compareTo(targets[r]) // Order by target load
if (cmp != 0) {
@@ -172,13 +159,12 @@ public class VmInterferenceModel private constructor(
@Suppress("UNCHECKED_CAST")
return VmInterferenceModel(
- newTargets,
- newScores,
idMapping,
newMembers as Array<IntArray>,
membership.map { it.value.toIntArray() }.toTypedArray(),
- size,
- seed
+ newTargets,
+ newScores,
+ size
)
}
@@ -192,202 +178,12 @@ public class VmInterferenceModel private constructor(
_targets = _targets.copyOf(newSize)
_scores = _scores.copyOf(newSize)
}
- }
-
- /**
- * Internal implementation of [VmInterferenceDomain].
- */
- private class InterferenceDomainImpl(
- private val targets: DoubleArray,
- private val scores: DoubleArray,
- private val idMapping: Map<String, Int>,
- private val members: Array<IntArray>,
- private val membership: Array<IntArray>,
- private val random: SplittableRandom
- ) : VmInterferenceDomain {
- /**
- * Keys registered with this domain.
- */
- private val keys = HashMap<Int, InterferenceKeyImpl>()
-
- /**
- * The set of keys active in this domain.
- */
- private val activeKeys = ArrayList<InterferenceKeyImpl>()
-
- override fun createKey(id: String): InterferenceKey? {
- val intId = idMapping[id] ?: return null
- return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) }
- }
-
- override fun removeKey(key: InterferenceKey) {
- if (key !is InterferenceKeyImpl) {
- return
- }
-
- if (activeKeys.remove(key)) {
- computeActiveGroups(key.id)
- }
-
- keys.remove(key.id)
- }
-
- override fun join(key: InterferenceKey) {
- if (key !is InterferenceKeyImpl) {
- return
- }
-
- if (key.acquire()) {
- val pos = activeKeys.binarySearch(key)
- if (pos < 0) {
- activeKeys.add(-pos - 1, key)
- }
- computeActiveGroups(key.id)
- }
- }
- override fun leave(key: InterferenceKey) {
- if (key is InterferenceKeyImpl && key.release()) {
- activeKeys.remove(key)
- computeActiveGroups(key.id)
- }
+ private companion object {
+ /**
+ * The initial capacity of the builder.
+ */
+ const val INITIAL_CAPACITY = 256
}
-
- override fun apply(key: InterferenceKey?, load: Double): Double {
- if (key == null || key !is InterferenceKeyImpl) {
- return 1.0
- }
-
- val groups = key.groups
- val groupSize = groups.size
-
- if (groupSize == 0) {
- return 1.0
- }
-
- val targets = targets
- val scores = scores
- var low = 0
- var high = groups.size - 1
-
- var group = -1
- var score = 1.0
-
- // Perform binary search over the groups based on target load
- while (low <= high) {
- val mid = low + high ushr 1
- val midGroup = groups[mid]
- val target = targets[midGroup]
-
- if (target < load) {
- low = mid + 1
- group = midGroup
- score = scores[midGroup]
- } else if (target > load) {
- high = mid - 1
- } else {
- group = midGroup
- score = scores[midGroup]
- break
- }
- }
-
- return if (group >= 0 && random.nextInt(members[group].size) == 0) {
- score
- } else {
- 1.0
- }
- }
-
- override fun toString(): String = "VmInterferenceDomain"
-
- /**
- * Queue of participants that will be removed or added to the active groups.
- */
- private val _participants = ArrayDeque<InterferenceKeyImpl>()
-
- /**
- * (Re-)Compute the active groups.
- */
- private fun computeActiveGroups(id: Int) {
- val activeKeys = activeKeys
- val groups = membership[id]
-
- if (activeKeys.isEmpty()) {
- return
- }
-
- val members = members
- val participants = _participants
-
- for (group in groups) {
- val groupMembers = members[group]
-
- var i = 0
- var j = 0
- var intersection = 0
-
- // Compute the intersection of the group members and the current active members
- while (i < groupMembers.size && j < activeKeys.size) {
- val l = groupMembers[i]
- val rightEntry = activeKeys[j]
- val r = rightEntry.id
-
- if (l < r) {
- i++
- } else if (l > r) {
- j++
- } else {
- participants.add(rightEntry)
- intersection++
-
- i++
- j++
- }
- }
-
- while (true) {
- val participant = participants.poll() ?: break
- val participantGroups = participant.groups
- if (intersection <= 1) {
- participantGroups.remove(group)
- } else {
- val pos = participantGroups.binarySearch(group)
- if (pos < 0) {
- participantGroups.add(-pos - 1, group)
- }
- }
- }
- }
- }
- }
-
- /**
- * An interference key.
- *
- * @param id The identifier of the member.
- */
- private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> {
- /**
- * The active groups to which the key belongs.
- */
- @JvmField val groups: MutableList<Int> = ArrayList()
-
- /**
- * The number of users of the interference key.
- */
- private var refCount: Int = 0
-
- /**
- * Join the domain.
- */
- fun acquire(): Boolean = refCount++ <= 0
-
- /**
- * Leave the domain.
- */
- fun release(): Boolean = --refCount <= 0
-
- override fun compareTo(other: InterferenceKeyImpl): Int = id.compareTo(other.id)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt
new file mode 100644
index 00000000..004dbd07
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceProfile.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.kernel.interference
+
+/**
+ * A profile of a particular virtual machine describing its interference pattern with other virtual machines.
+ *
+ * @param model The model to which this profile belongs.
+ * @property id The identifier of the profile inside the model.
+ * @property membership The membership of the profile in the groups.
+ * @param members The members in the model.
+ * @param targets The targets in the model.
+ * @param scores The scores in the model.
+ */
+public class VmInterferenceProfile internal constructor(
+ private val model: VmInterferenceModel,
+ private val id: Int,
+ private val membership: IntArray,
+ private val members: Array<IntArray>,
+ private val targets: DoubleArray,
+ private val scores: DoubleArray
+) {
+ /**
+ * Create a new [VmInterferenceMember] based on this profile for the specified [domain].
+ */
+ internal fun newMember(domain: VmInterferenceDomain): VmInterferenceMember {
+ return VmInterferenceMember(domain, model, id, membership, members, targets, scores)
+ }
+
+ override fun toString(): String = "VmInterferenceProfile[id=$id]"
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt
index 7e4d7191..22dcaef4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/model/MachineModel.kt
@@ -35,4 +35,20 @@ public data class MachineModel(
public val memory: List<MemoryUnit>,
public val net: List<NetworkAdapter> = emptyList(),
public val storage: List<StorageDevice> = emptyList()
-)
+) {
+ /**
+ * Optimize the [MachineModel] by merging all resources of the same type into a single resource with the combined
+ * capacity. Such configurations can be simulated more efficiently by OpenDC.
+ */
+ public fun optimize(): MachineModel {
+ val originalCpu = cpus[0]
+ val freq = cpus.sumOf { it.frequency }
+ val processingNode = originalCpu.node.copy(coreCount = 1)
+ val processingUnits = listOf(originalCpu.copy(frequency = freq, node = processingNode))
+
+ val memorySize = memory.sumOf { it.size }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return MachineModel(processingUnits, memoryUnits)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
index 6b820e5d..e66227b5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
@@ -31,14 +31,12 @@ import kotlin.math.min
* A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s.
*
* @param usageCol The column containing the CPU usage of each fragment (in MHz).
- * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis).
* @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis).
* @param coresCol The column containing the utilized cores.
* @param size The number of fragments in the trace.
*/
public class SimTrace(
private val usageCol: DoubleArray,
- private val timestampCol: LongArray,
private val deadlineCol: LongArray,
private val coresCol: IntArray,
private val size: Int,
@@ -46,7 +44,6 @@ public class SimTrace(
init {
require(size >= 0) { "Invalid trace size" }
require(usageCol.size >= size) { "Invalid number of usage entries" }
- require(timestampCol.size >= size) { "Invalid number of timestamp entries" }
require(deadlineCol.size >= size) { "Invalid number of deadline entries" }
require(coresCol.size >= size) { "Invalid number of core entries" }
}
@@ -59,19 +56,17 @@ public class SimTrace(
public fun ofFragments(fragments: List<SimTraceFragment>): SimTrace {
val size = fragments.size
val usageCol = DoubleArray(size)
- val timestampCol = LongArray(size)
val deadlineCol = LongArray(size)
val coresCol = IntArray(size)
for (i in fragments.indices) {
val fragment = fragments[i]
usageCol[i] = fragment.usage
- timestampCol[i] = fragment.timestamp
deadlineCol[i] = fragment.timestamp + fragment.duration
coresCol[i] = fragment.cores
}
- return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ return SimTrace(usageCol, deadlineCol, coresCol, size)
}
/**
@@ -81,19 +76,17 @@ public class SimTrace(
public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace {
val size = fragments.size
val usageCol = DoubleArray(size)
- val timestampCol = LongArray(size)
val deadlineCol = LongArray(size)
val coresCol = IntArray(size)
for (i in fragments.indices) {
val fragment = fragments[i]
usageCol[i] = fragment.usage
- timestampCol[i] = fragment.timestamp
deadlineCol[i] = fragment.timestamp + fragment.duration
coresCol[i] = fragment.cores
}
- return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ return SimTrace(usageCol, deadlineCol, coresCol, size)
}
/**
@@ -108,25 +101,9 @@ public class SimTrace(
*
* @param cpu The [ProcessingUnit] for which to create the source.
* @param offset The time offset to use for the trace.
- * @param fillMode The [FillMode] for filling missing data.
*/
- public fun newSource(cpu: ProcessingUnit, offset: Long, fillMode: FillMode = FillMode.None): FlowSource {
- return CpuConsumer(cpu, offset, fillMode, usageCol, timestampCol, deadlineCol, coresCol, size)
- }
-
- /**
- * An enumeration describing the modes for filling missing data.
- */
- public enum class FillMode {
- /**
- * When a gap in the trace data occurs, the CPU usage will be set to zero.
- */
- None,
-
- /**
- * When a gap in the trace data occurs, the previous CPU usage will be used.
- */
- Previous
+ public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource {
+ return CpuConsumer(cpu, offset, usageCol, deadlineCol, coresCol, size)
}
/**
@@ -137,7 +114,6 @@ public class SimTrace(
* The columns of the trace.
*/
private var usageCol: DoubleArray = DoubleArray(16)
- private var timestampCol: LongArray = LongArray(16)
private var deadlineCol: LongArray = LongArray(16)
private var coresCol: IntArray = IntArray(16)
@@ -150,25 +126,23 @@ public class SimTrace(
* Add the specified [SimTraceFragment] to the trace.
*/
public fun add(fragment: SimTraceFragment) {
- add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores)
+ add(fragment.timestamp + fragment.duration, fragment.usage, fragment.cores)
}
/**
* Add a fragment to the trace.
*
- * @param timestamp Timestamp at which the fragment starts (in epoch millis).
* @param deadline Timestamp at which the fragment ends (in epoch millis).
* @param usage CPU usage of this fragment.
* @param cores Number of cores used.
*/
- public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
+ public fun add(deadline: Long, usage: Double, cores: Int) {
val size = size
if (size == usageCol.size) {
grow()
}
- timestampCol[size] = timestamp
deadlineCol[size] = deadline
usageCol[size] = usage
coresCol[size] = cores
@@ -184,7 +158,6 @@ public class SimTrace(
val newSize = arraySize + (arraySize shr 1)
usageCol = usageCol.copyOf(newSize)
- timestampCol = timestampCol.copyOf(newSize)
deadlineCol = deadlineCol.copyOf(newSize)
coresCol = coresCol.copyOf(newSize)
}
@@ -193,7 +166,7 @@ public class SimTrace(
* Construct the immutable [SimTrace].
*/
public fun build(): SimTrace {
- return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ return SimTrace(usageCol, deadlineCol, coresCol, size)
}
}
@@ -203,9 +176,7 @@ public class SimTrace(
private class CpuConsumer(
cpu: ProcessingUnit,
private val offset: Long,
- private val fillMode: FillMode,
private val usageCol: DoubleArray,
- private val timestampCol: LongArray,
private val deadlineCol: LongArray,
private val coresCol: IntArray,
private val size: Int
@@ -236,16 +207,6 @@ public class SimTrace(
}
_idx = idx
- val timestamp = timestampCol[idx]
-
- // There is a gap in the trace, since the next fragment starts in the future.
- if (timestamp > nowOffset) {
- when (fillMode) {
- FillMode.None -> conn.push(0.0) // Reset rate to zero
- FillMode.Previous -> {} // Keep previous rate
- }
- return timestamp - nowOffset
- }
val cores = min(coreCount, coresCol[idx])
val usage = usageCol[idx]
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 91855e8d..ddf8cf14 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -43,11 +43,12 @@ import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
+import java.util.*
/**
* Test suite for the [SimHypervisor] class.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimFairShareHypervisorTest {
private lateinit var model: MachineModel
@@ -76,9 +77,9 @@ internal class SimFairShareHypervisorTest {
),
)
- val platform = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(platform, null, PerformanceScalingGovernor(), null)
+ val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), PerformanceScalingGovernor())
launch {
machine.runWorkload(hypervisor)
@@ -125,11 +126,9 @@ internal class SimFairShareHypervisorTest {
)
)
- val platform = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimFairShareHypervisor(platform, null, null, null)
+ val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null)
launch {
machine.runWorkload(hypervisor)
@@ -166,9 +165,9 @@ internal class SimFairShareHypervisorTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val platform = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(platform, null, null, null)
+ val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null)
assertDoesNotThrow {
launch {
@@ -193,11 +192,9 @@ internal class SimFairShareHypervisorTest {
.addGroup(targetLoad = 0.1, score = 0.8, members = setOf("a", "n"))
.build()
- val platform = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimFairShareHypervisor(platform, null, null, interferenceModel.newDomain())
+ val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.maxMinMultiplexer(), SplittableRandom(1), null)
val duration = 5 * 60L
val workloadA =
@@ -225,12 +222,12 @@ internal class SimFairShareHypervisorTest {
coroutineScope {
launch {
- val vm = hypervisor.newMachine(model, "a")
- vm.runWorkload(workloadA)
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadA, meta = mapOf("interference-model" to interferenceModel.getProfile("a")!!))
hypervisor.removeMachine(vm)
}
- val vm = hypervisor.newMachine(model, "b")
- vm.runWorkload(workloadB)
+ val vm = hypervisor.newMachine(model)
+ vm.runWorkload(workloadB, meta = mapOf("interference-model" to interferenceModel.getProfile("b")!!))
hypervisor.removeMachine(vm)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 823a0ae3..df6755f1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.compute.kernel
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.*
@@ -40,11 +39,12 @@ import org.opendc.simulator.compute.runWorkload
import org.opendc.simulator.compute.workload.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.mux.FlowMultiplexerFactory
+import java.util.*
/**
- * A test suite for the [SimSpaceSharedHypervisor].
+ * A test suite for a space-shared [SimHypervisor].
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class SimSpaceSharedHypervisorTest {
private lateinit var machineModel: MachineModel
@@ -75,7 +75,7 @@ internal class SimSpaceSharedHypervisorTest {
val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
val vm = hypervisor.newMachine(machineModel)
@@ -97,7 +97,7 @@ internal class SimSpaceSharedHypervisorTest {
val workload = SimRuntimeWorkload(duration)
val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
yield()
@@ -118,10 +118,8 @@ internal class SimSpaceSharedHypervisorTest {
val duration = 5 * 60L * 1000
val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
yield()
@@ -139,10 +137,8 @@ internal class SimSpaceSharedHypervisorTest {
fun testTwoWorkloads() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val engine = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
yield()
@@ -169,7 +165,7 @@ internal class SimSpaceSharedHypervisorTest {
fun testConcurrentWorkloadFails() = runBlockingSimulation {
val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimSpaceSharedHypervisor(engine, null, null)
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
yield()
@@ -189,11 +185,9 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runBlockingSimulation {
- val interpreter = FlowEngine(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(interpreter, null, null)
+ val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimHypervisor(engine, FlowMultiplexerFactory.forwardingMultiplexer(), SplittableRandom(1), null)
launch { machine.runWorkload(hypervisor) }
yield()
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
index a717ae6e..d8ad7978 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -42,11 +42,6 @@ public interface FlowCounters {
public val remaining: Double
/**
- * The accumulated flow lost due to interference between sources.
- */
- public val interference: Double
-
- /**
* Reset the flow counters.
*/
public fun reset()
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 0ad18f6a..6fa2971a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -258,6 +258,6 @@ public class FlowForwarder(
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
- counters.increment(work, actualWork, (total - actualWork), 0.0)
+ counters.increment(work, actualWork, (total - actualWork))
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index d0324ce8..ee8cd739 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -154,7 +154,7 @@ public class FlowSink(
val work = capacity * deltaS
val actualWork = ctx.rate * deltaS
- counters.increment(work, actualWork, (total - actualWork), 0.0)
+ counters.increment(work, actualWork, (total - actualWork))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
deleted file mode 100644
index aa2713b6..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.opendc.simulator.flow.interference
-
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur
- * performance variability due to operating on the same resource and therefore causing interference.
- */
-public interface InterferenceDomain {
- /**
- * Compute the performance score of a participant in this interference domain.
- *
- * @param key The participant to obtain the score of or `null` if the participant has no key.
- * @param load The overall load on the interference domain.
- * @return A score representing the performance score to be applied to the resource consumer, with 1
- * meaning no influence, <1 means that performance degrades, and >1 means that performance improves.
- */
- public fun apply(key: InterferenceKey?, load: Double): Double
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
deleted file mode 100644
index d28ebde5..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.interference
-
-/**
- * A key that uniquely identifies a participant of an interference domain.
- */
-public interface InterferenceKey
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
index d990dc61..c320a362 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
@@ -34,23 +34,20 @@ public class MutableFlowCounters : FlowCounters {
get() = _counters[1]
override val remaining: Double
get() = _counters[2]
- override val interference: Double
- get() = _counters[3]
- private val _counters = DoubleArray(4)
+ private val _counters = DoubleArray(3)
override fun reset() {
_counters.fill(0.0)
}
- public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) {
+ public fun increment(demand: Double, actual: Double, remaining: Double) {
val counters = _counters
counters[0] += demand
counters[1] += actual
counters[2] += remaining
- counters[3] += interference
}
override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]"
+ return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 5f198944..8752c559 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -25,13 +25,22 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.FlowConsumer
import org.opendc.simulator.flow.FlowCounters
import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.interference.InterferenceKey
/**
* A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
*/
public interface FlowMultiplexer {
/**
+ * The maximum number of inputs supported by the multiplexer.
+ */
+ public val maxInputs: Int
+
+ /**
+ * The maximum number of outputs supported by the multiplexer.
+ */
+ public val maxOutputs: Int
+
+ /**
* The inputs of the multiplexer that can be used to consume sources.
*/
public val inputs: Set<FlowConsumer>
@@ -63,18 +72,15 @@ public interface FlowMultiplexer {
/**
* Create a new input on this multiplexer with a coupled capacity.
- *
- * @param key The key of the interference member to which the input belongs.
*/
- public fun newInput(key: InterferenceKey? = null): FlowConsumer
+ public fun newInput(): FlowConsumer
/**
* Create a new input on this multiplexer with the specified [capacity].
*
* @param capacity The capacity of the input.
- * @param key The key of the interference member to which the input belongs.
*/
- public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer
+ public fun newInput(capacity: Double): FlowConsumer
/**
* Remove [input] from this multiplexer.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
new file mode 100644
index 00000000..a863e3ad
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.FlowConvergenceListener
+import org.opendc.simulator.flow.FlowEngine
+
+/**
+ * Factory interface for a [FlowMultiplexer] implementation.
+ */
+public fun interface FlowMultiplexerFactory {
+ /**
+ * Construct a new [FlowMultiplexer] using the specified [engine] and [listener].
+ */
+ public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer
+
+ public companion object {
+ /**
+ * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer].
+ */
+ private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) }
+
+ /**
+ * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer].
+ */
+ private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) }
+
+ /**
+ * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances.
+ */
+ @JvmStatic
+ public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY
+
+ /**
+ * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances.
+ */
+ @JvmStatic
+ public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 1d7d22ef..c50e9bbc 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
-import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.ArrayDeque
/**
@@ -38,6 +37,12 @@ public class ForwardingFlowMultiplexer(
private val engine: FlowEngine,
private val listener: FlowConvergenceListener? = null
) : FlowMultiplexer, FlowConvergenceListener {
+
+ override val maxInputs: Int
+ get() = _outputs.size
+
+ override val maxOutputs: Int = Int.MAX_VALUE
+
override val inputs: Set<FlowConsumer>
get() = _inputs
private val _inputs = mutableSetOf<Input>()
@@ -54,8 +59,6 @@ public class ForwardingFlowMultiplexer(
get() = _outputs.sumOf { it.forwarder.counters.actual }
override val remaining: Double
get() = _outputs.sumOf { it.forwarder.counters.remaining }
- override val interference: Double
- get() = _outputs.sumOf { it.forwarder.counters.interference }
override fun reset() {
for (output in _outputs) {
@@ -75,14 +78,14 @@ public class ForwardingFlowMultiplexer(
override val capacity: Double
get() = _outputs.sumOf { it.forwarder.capacity }
- override fun newInput(key: InterferenceKey?): FlowConsumer {
+ override fun newInput(): FlowConsumer {
val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
val input = Input(output)
_inputs += input
return input
}
- override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key)
+ override fun newInput(capacity: Double): FlowConsumer = newInput()
override fun removeInput(input: FlowConsumer) {
if (!_inputs.remove(input)) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index cc831862..f2a4c1a4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -23,11 +23,8 @@
package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
-import org.opendc.simulator.flow.interference.InterferenceDomain
-import org.opendc.simulator.flow.interference.InterferenceKey
import org.opendc.simulator.flow.internal.D_MS_TO_S
import org.opendc.simulator.flow.internal.MutableFlowCounters
-import kotlin.math.max
import kotlin.math.min
/**
@@ -35,13 +32,16 @@ import kotlin.math.min
*
* @param engine The [FlowEngine] to drive the flow simulation.
* @param parent The parent flow system of the multiplexer.
- * @param interferenceDomain The interference domain of the multiplexer.
*/
public class MaxMinFlowMultiplexer(
private val engine: FlowEngine,
- parent: FlowConvergenceListener? = null,
- private val interferenceDomain: InterferenceDomain? = null
+ parent: FlowConvergenceListener? = null
) : FlowMultiplexer {
+
+ override val maxInputs: Int = Int.MAX_VALUE
+
+ override val maxOutputs: Int = Int.MAX_VALUE
+
/**
* The inputs of the multiplexer.
*/
@@ -85,16 +85,16 @@ public class MaxMinFlowMultiplexer(
*/
private val scheduler = Scheduler(engine, parent)
- override fun newInput(key: InterferenceKey?): FlowConsumer {
- return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key)
+ override fun newInput(): FlowConsumer {
+ return newInput(isCoupled = true, Double.POSITIVE_INFINITY)
}
- override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer {
- return newInput(isCoupled = false, capacity, key)
+ override fun newInput(capacity: Double): FlowConsumer {
+ return newInput(isCoupled = false, capacity)
}
- private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer {
- val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity)
+ private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer {
+ val provider = Input(engine, scheduler, isCoupled, initialCapacity)
_inputs.add(provider)
return provider
}
@@ -499,8 +499,7 @@ public class MaxMinFlowMultiplexer(
counters.increment(
demand = demand * deltaS,
actual = rate * deltaS,
- remaining = (previousCapacity - rate) * deltaS,
- interference = 0.0
+ remaining = (previousCapacity - rate) * deltaS
)
}
}
@@ -511,8 +510,6 @@ public class MaxMinFlowMultiplexer(
private class Input(
private val engine: FlowEngine,
private val scheduler: Scheduler,
- private val interferenceDomain: InterferenceDomain?,
- @JvmField val key: InterferenceKey?,
@JvmField val isCoupled: Boolean,
initialCapacity: Double,
) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
@@ -693,24 +690,15 @@ public class MaxMinFlowMultiplexer(
return
}
- // Compute the performance penalty due to flow interference
- val perfScore = if (interferenceDomain != null) {
- val load = scheduler.rate / scheduler.capacity
- interferenceDomain.apply(key, load)
- } else {
- 1.0
- }
-
val actualRate = actualRate
val deltaS = delta * D_MS_TO_S
val demand = limit * deltaS
val actual = actualRate * deltaS
val remaining = (_capacity - actualRate) * deltaS
- val interference = actual * max(0.0, 1 - perfScore)
- _counters.increment(demand, actual, remaining, interference)
- scheduler.counters.increment(0.0, 0.0, 0.0, interference)
+ _counters.increment(demand, actual, remaining)
+ scheduler.counters.increment(0.0, 0.0, 0.0)
}
}