diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-29 23:56:16 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:39 +0200 |
| commit | 4cc1d40d421c8736f8b21b360b61d6b065158b7a (patch) | |
| tree | cb2de79a72881eb0b2dee6a82dd498faba5dd26d /opendc-simulator | |
| parent | dd605ab1f70fef1fbbed848e8ebbd6b231622273 (diff) | |
refactor(simulator): Migrate to flow-based simulation
This change renames the `opendc-simulator-resources` module into the
`opendc-simulator-flow` module to indicate that the core simulation
model of OpenDC is based around modelling and simulating flows.
Previously, the distinction between resource consumer and provider, and
input and output caused some confusion. By switching to a flow-based
model, this distinction is now clear (as in, the water flows from source
to consumer/sink).
Diffstat (limited to 'opendc-simulator')
82 files changed, 1711 insertions, 1765 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index 7d06ee62..e2290a14 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -31,7 +31,7 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcSimulator.opendcSimulatorResources) + api(projects.opendcSimulator.opendcSimulatorFlow) api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 88ad7286..c57919c1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -36,7 +36,7 @@ import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import org.openjdk.jmh.annotations.* import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -48,13 +48,13 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var interpreter: SimResourceInterpreter + private lateinit var engine: FlowEngine private lateinit var machineModel: MachineModel @Setup fun setUp() { scope = SimulationCoroutineScope() - interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) + engine = FlowEngine(scope.coroutineContext, scope.clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -80,7 +80,7 @@ class SimMachineBenchmarks { fun benchmarkBareMetal(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace)) } @@ -90,9 +90,9 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } @@ -111,9 +111,9 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(interpreter) + val hypervisor = SimFairShareHypervisor(engine) launch { machine.run(hypervisor) } @@ -132,9 +132,9 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(interpreter) + val hypervisor = SimFairShareHypervisor(engine) launch { machine.run(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index f9db048d..6a62d8a5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -30,22 +30,22 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.NetworkAdapter import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* import kotlin.coroutines.Continuation import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. * - * @param interpreter The interpreter to manage the machine's resources. + * @param engine The engine to manage the machine's resources. * @param parent The parent simulation system. * @param model The model of the machine. */ public abstract class SimAbstractMachine( - protected val interpreter: SimResourceInterpreter, - final override val parent: SimResourceSystem?, + protected val engine: FlowEngine, + final override val parent: FlowSystem?, final override val model: MachineModel -) : SimMachine, SimResourceSystem { +) : SimMachine, FlowSystem { /** * The resources allocated for this machine. */ @@ -54,17 +54,17 @@ public abstract class SimAbstractMachine( /** * The memory interface of the machine. */ - public val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory) + public val memory: SimMemory = Memory(FlowSink(engine, model.memory.sumOf { it.size }.toDouble()), model.memory) /** * The network interfaces available to the machine. */ - public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) } + public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(engine, adapter, i) } /** * The network interfaces available to the machine. */ - public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) } + public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(engine, device, i) } /** * The peripherals of the machine. @@ -82,7 +82,7 @@ public abstract class SimAbstractMachine( private var cont: Continuation<Unit>? = null /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { check(!isTerminated) { "Machine is terminated" } @@ -96,14 +96,14 @@ public abstract class SimAbstractMachine( // Cancel all cpus on cancellation cont.invokeOnCancellation { this.cont = null - interpreter.batch { + engine.batch { for (cpu in cpus) { cpu.cancel() } } } - interpreter.batch { workload.onStart(ctx) } + engine.batch { workload.onStart(ctx) } } } @@ -120,7 +120,7 @@ public abstract class SimAbstractMachine( * Cancel the workload that is currently running on the machine. */ private fun cancel() { - interpreter.batch { + engine.batch { for (cpu in cpus) { cpu.cancel() } @@ -137,8 +137,8 @@ public abstract class SimAbstractMachine( * The execution context in which the workload runs. */ private inner class Context(override val meta: Map<String, Any>) : SimMachineContext { - override val interpreter: SimResourceInterpreter - get() = this@SimAbstractMachine.interpreter + override val engine: FlowEngine + get() = this@SimAbstractMachine.engine override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus @@ -154,7 +154,7 @@ public abstract class SimAbstractMachine( /** * The [SimMemory] implementation for a machine. */ - private class Memory(source: SimResourceSource, override val models: List<MemoryUnit>) : SimMemory, SimResourceProvider by source { + private class Memory(source: FlowSink, override val models: List<MemoryUnit>) : SimMemory, FlowConsumer by source { override fun toString(): String = "SimAbstractMachine.Memory" } @@ -162,6 +162,7 @@ public abstract class SimAbstractMachine( * The [SimNetworkAdapter] implementation for a machine. */ private class NetworkAdapterImpl( + private val engine: FlowEngine, model: NetworkAdapter, index: Int ) : SimNetworkAdapter(), SimNetworkInterface { @@ -169,18 +170,18 @@ public abstract class SimAbstractMachine( override val bandwidth: Double = model.bandwidth - override val provider: SimResourceProvider + override val provider: FlowConsumer get() = _rx - override fun createConsumer(): SimResourceConsumer = _tx + override fun createConsumer(): FlowSource = _tx - override val tx: SimResourceProvider + override val tx: FlowConsumer get() = _tx - private val _tx = SimResourceForwarder() + private val _tx = FlowForwarder(engine) - override val rx: SimResourceConsumer + override val rx: FlowSource get() = _rx - private val _rx = SimResourceForwarder() + private val _rx = FlowForwarder(engine) override fun toString(): String = "SimAbstractMachine.NetworkAdapterImpl[name=$name,bandwidth=$bandwidth]" } @@ -189,7 +190,7 @@ public abstract class SimAbstractMachine( * The [SimStorageInterface] implementation for a machine. */ private class StorageDeviceImpl( - interpreter: SimResourceInterpreter, + engine: FlowEngine, model: StorageDevice, index: Int ) : SimStorageInterface { @@ -197,9 +198,9 @@ public abstract class SimAbstractMachine( override val capacity: Double = model.capacity - override val read: SimResourceProvider = SimResourceSource(model.readBandwidth, interpreter) + override val read: FlowConsumer = FlowSink(engine, model.readBandwidth) - override val write: SimResourceProvider = SimResourceSource(model.writeBandwidth, interpreter) + override val write: FlowConsumer = FlowSink(engine, model.writeBandwidth) override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 639ca450..37cf282b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -26,8 +26,8 @@ import org.opendc.simulator.compute.device.SimPsu import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerDriver -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.FlowEngine /** * A simulated bare-metal machine that is able to run a single workload. @@ -35,19 +35,19 @@ import org.opendc.simulator.resources.SimResourceInterpreter * A [SimBareMetalMachine] is a stateful object, and you should be careful when operating this object concurrently. For * example, the class expects only a single concurrent call to [run]. * - * @param interpreter The [SimResourceInterpreter] to drive the simulation. + * @param engine The [FlowEngine] to drive the simulation. * @param model The machine model to simulate. * @param powerDriver The power driver to use. * @param psu The power supply of the machine. * @param parent The parent simulation system. */ public class SimBareMetalMachine( - interpreter: SimResourceInterpreter, + engine: FlowEngine, model: MachineModel, powerDriver: PowerDriver, public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)), - parent: SimResourceSystem? = null, -) : SimAbstractMachine(interpreter, parent, model) { + parent: FlowSystem? = null, +) : SimAbstractMachine(engine, parent, model) { /** * The power draw of the machine onto the PSU. */ @@ -58,7 +58,7 @@ public class SimBareMetalMachine( * The processing units of the machine. */ override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu -> - Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu) + Cpu(FlowSink(engine, cpu.frequency, this@SimBareMetalMachine), cpu) } /** @@ -78,9 +78,9 @@ public class SimBareMetalMachine( * A [SimProcessingUnit] of a bare-metal machine. */ private class Cpu( - private val source: SimResourceSource, + private val source: FlowSink, override val model: ProcessingUnit - ) : SimProcessingUnit, SimResourceProvider by source { + ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity set(value) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index d8dd8205..ab0b56ae 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -41,7 +41,7 @@ public interface SimMachine : AutoCloseable { public val peripherals: List<SimPeripheral> /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index 6996a30d..1317f728 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * A simulated execution context in which a bootable image runs. This interface represents the @@ -31,9 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter */ public interface SimMachineContext : AutoCloseable { /** - * The resource interpreter that simulates the machine. + * The [FlowEngine] that simulates the machine. */ - public val interpreter: SimResourceInterpreter + public val engine: FlowEngine /** * The metadata associated with the context. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt index 6623df23..b1aef495 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt @@ -23,12 +23,12 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer /** * An interface to control the memory usage of simulated workloads. */ -public interface SimMemory : SimResourceProvider { +public interface SimMemory : FlowConsumer { /** * The models representing the static information of the memory units supporting this interface. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt index 1ac126ae..660b2871 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowSource /** * A firmware interface to a network adapter. @@ -42,10 +42,10 @@ public interface SimNetworkInterface { /** * The resource provider for the transmit channel of the network interface. */ - public val tx: SimResourceProvider + public val tx: FlowConsumer /** * The resource consumer for the receive channel of the network interface. */ - public val rx: SimResourceConsumer + public val rx: FlowSource } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt index 93c9ddfa..c9f36ece 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt @@ -23,12 +23,12 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer /** * A simulated processing unit. */ -public interface SimProcessingUnit : SimResourceProvider { +public interface SimProcessingUnit : FlowConsumer { /** * The capacity of the processing unit, which can be adjusted by the workload if supported by the machine. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt index 21a801f1..3d648671 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer /** * A firmware interface to a storage device. @@ -41,10 +41,10 @@ public interface SimStorageInterface { /** * The resource provider for the read operations of the storage device. */ - public val read: SimResourceProvider + public val read: FlowConsumer /** * The resource consumer for the write operation of the storage device. */ - public val write: SimResourceProvider + public val write: FlowConsumer } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 6e6e590f..b05d8ad9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -23,10 +23,10 @@ package org.opendc.simulator.compute.device import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource import org.opendc.simulator.power.SimPowerInlet -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent import java.util.* /** @@ -54,7 +54,7 @@ public class SimPsu( /** * The consumer context. */ - private var _ctx: SimResourceContext? = null + private var _ctx: FlowConnection? = null /** * The driver that is connected to the PSU. @@ -69,7 +69,7 @@ public class SimPsu( * Update the power draw of the PSU. */ public fun update() { - _ctx?.interrupt() + _ctx?.pull() } /** @@ -81,18 +81,18 @@ public class SimPsu( update() } - override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun createConsumer(): FlowSource = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) - ctx.push(powerDraw) + conn.push(powerDraw) return Long.MAX_VALUE } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> _ctx = ctx - SimResourceEvent.Run -> _powerDraw = ctx.speed - SimResourceEvent.Exit -> _ctx = null + FlowEvent.Start -> _ctx = conn + FlowEvent.Converge -> _powerDraw = conn.rate + FlowEvent.Exit -> _ctx = null else -> {} } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index cf9e3230..b145eefc 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -28,17 +28,17 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.SimResourceSwitch +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.FlowMultiplexer /** * Abstract implementation of the [SimHypervisor] interface. * - * @param interpreter The resource interpreter to use. + * @param engine The [FlowEngine] to drive the simulation. * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. */ public abstract class SimAbstractHypervisor( - private val interpreter: SimResourceInterpreter, + protected val engine: FlowEngine, private val scalingGovernor: ScalingGovernor? = null, protected val interferenceDomain: VmInterferenceDomain? = null ) : SimHypervisor { @@ -50,7 +50,7 @@ public abstract class SimAbstractHypervisor( /** * The resource switch to use. */ - private lateinit var switch: SimResourceSwitch + private lateinit var mux: FlowMultiplexer /** * The virtual machines running on this hypervisor. @@ -62,8 +62,8 @@ public abstract class SimAbstractHypervisor( /** * The resource counters associated with the hypervisor. */ - public override val counters: SimResourceCounters - get() = switch.counters + public override val counters: FlowCounters + get() = mux.counters /** * The scaling governors attached to the physical CPUs backing this hypervisor. @@ -71,14 +71,14 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf<ScalingGovernor.Logic>() /** - * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. + * Construct the [FlowMultiplexer] implementation that performs the actual scheduling of the CPUs. */ - public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch + public abstract fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer /** * Check whether the specified machine model fits on this hypervisor. */ - public abstract fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean + public abstract fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean /** * Trigger the governors to recompute the scaling limits. @@ -91,7 +91,7 @@ public abstract class SimAbstractHypervisor( /* SimHypervisor */ override fun canFit(model: MachineModel): Boolean { - return canFit(model, switch) + return canFit(model, mux) } override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine { @@ -104,7 +104,7 @@ public abstract class SimAbstractHypervisor( /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx - switch = createSwitch(ctx) + mux = createMultiplexer(ctx) for (cpu in ctx.cpus) { val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) @@ -113,7 +113,7 @@ public abstract class SimAbstractHypervisor( governor.onStart() } - switch.addInput(cpu) + mux.addOutput(cpu) } } @@ -122,7 +122,7 @@ public abstract class SimAbstractHypervisor( * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(interpreter, parent = null, model) { + private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model) { /** * The interference key of this virtual machine. */ @@ -131,7 +131,7 @@ public abstract class SimAbstractHypervisor( /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(switch, switch.newOutput(interferenceKey), it) } + override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) } override fun close() { super.close() @@ -153,10 +153,10 @@ public abstract class SimAbstractHypervisor( * A [SimProcessingUnit] of a virtual machine. */ private class VCpu( - private val switch: SimResourceSwitch, - private val source: SimResourceProvider, + private val switch: FlowMultiplexer, + private val source: FlowConsumer, override val model: ProcessingUnit - ) : SimProcessingUnit, SimResourceProvider by source { + ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity set(_) { @@ -169,7 +169,7 @@ public abstract class SimAbstractHypervisor( * Close the CPU */ fun close() { - switch.removeOutput(source) + switch.removeInput(source) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 3b44292d..36ab7c1c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -28,39 +28,39 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSwitch -import org.opendc.simulator.resources.SimResourceSwitchMaxMin -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] * concurrently using weighted fair sharing. * - * @param interpreter The interpreter to manage the machine's resources. + * @param engine The [FlowEngine] to manage the machine's resources. * @param parent The parent simulation system. * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. * @param interferenceDomain The resource interference domain to which the hypervisor belongs. * @param listener The hypervisor listener to use. */ public class SimFairShareHypervisor( - private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, + engine: FlowEngine, + private val parent: FlowSystem? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, private val listener: SimHypervisor.Listener? = null -) : SimAbstractHypervisor(interpreter, scalingGovernor, interferenceDomain) { +) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) { - override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean = true + override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean = true - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { + override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer { return SwitchSystem(ctx).switch } - private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem { - val switch = SimResourceSwitchMaxMin(interpreter, this, interferenceDomain) + private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowSystem { + val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain) - override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent + override val parent: FlowSystem? = this@SimFairShareHypervisor.parent private var lastCpuUsage = 0.0 private var lastCpuDemand = 0.0 @@ -87,8 +87,8 @@ public class SimFairShareHypervisor( } lastReport = timestamp - lastCpuDemand = switch.inputs.sumOf { it.demand } - lastCpuUsage = switch.inputs.sumOf { it.speed } + lastCpuDemand = switch.outputs.sumOf { it.demand } + lastCpuUsage = switch.outputs.sumOf { it.rate } lastDemand = counters.demand lastActual = counters.actual lastOvercommit = counters.overcommit diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index 8d0592ec..bfa099fb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -34,13 +34,13 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" override fun create( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem?, + engine: FlowEngine, + parent: FlowSystem?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? ): SimHypervisor = SimFairShareHypervisor( - interpreter, + engine, parent, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 3b49d515..1b11ca6b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -25,7 +25,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceCounters +import org.opendc.simulator.flow.FlowCounters /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload @@ -40,7 +40,7 @@ public interface SimHypervisor : SimWorkload { /** * The resource counters associated with the hypervisor. */ - public val counters: SimResourceCounters + public val counters: FlowCounters /** * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index b307a34d..97f07097 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem /** * A service provider interface for constructing a [SimHypervisor]. @@ -43,8 +43,8 @@ public interface SimHypervisorProvider { * Create a [SimHypervisor] instance with the specified [listener]. */ public fun create( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null, + engine: FlowEngine, + parent: FlowSystem? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, listener: SimHypervisor.Listener? = null diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index ac1c0250..883e0d82 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -24,19 +24,19 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSwitch -import org.opendc.simulator.resources.SimResourceSwitchExclusive +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) { - override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean { - return switch.inputs.size - switch.outputs.size >= model.cpus.size +public class SimSpaceSharedHypervisor(engine: FlowEngine) : SimAbstractHypervisor(engine) { + override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean { + return switch.outputs.size - switch.inputs.size >= model.cpus.size } - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { - return SimResourceSwitchExclusive() + override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer { + return ForwardingFlowMultiplexer(engine) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index 3906cb9a..7869d72d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -34,10 +34,10 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" override fun create( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem?, + engine: FlowEngine, + parent: FlowSystem?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? - ): SimHypervisor = SimSpaceSharedHypervisor(interpreter) + ): SimHypervisor = SimSpaceSharedHypervisor(engine) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index 1801fcd0..b737d61a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.resources.interference.InterferenceDomain -import org.opendc.simulator.resources.interference.InterferenceKey +import org.opendc.simulator.flow.interference.InterferenceDomain +import org.opendc.simulator.flow.interference.InterferenceKey /** * The interference domain of a hypervisor. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index c2e00c8e..b3d72507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.resources.interference.InterferenceKey +import org.opendc.simulator.flow.interference.InterferenceKey import java.util.* /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt index 6577fbfc..f71446f8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt @@ -46,7 +46,7 @@ public class PStatePowerDriver(states: Map<Double, PowerModel>) : PowerDriver { for (cpu in cpus) { targetFreq = max(cpu.capacity, targetFreq) - totalSpeed += cpu.speed + totalSpeed += cpu.rate } val maxFreq = states.lastKey() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt index bf7aeff1..34e91c35 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt @@ -37,7 +37,7 @@ public class SimplePowerDriver(private val model: PowerModel) : PowerDriver { for (cpu in cpus) { targetFreq += cpu.capacity - totalSpeed += cpu.speed + totalSpeed += cpu.rate } return model.computePower(totalSpeed / targetFreq) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index a01fa20c..99f4a1e1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.source.FixedFlowSource /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -44,7 +44,7 @@ public class SimFlopsWorkload( override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { - cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization))) + cpu.startConsumer(lifecycle.waitFor(FixedFlowSource(flops.toDouble() / ctx.cpus.size, utilization))) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 4ee56689..2ef3bc43 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.source.FixedFlowSource /** * A [SimWorkload] that models application execution as a single duration. @@ -44,7 +44,7 @@ public class SimRuntimeWorkload( val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { val limit = cpu.capacity * utilization - cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization))) + cpu.startConsumer(lifecycle.waitFor(FixedFlowSource((limit / 1000) * duration, utilization))) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index dd582bb2..a877dac1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource import kotlin.math.min /** @@ -78,12 +78,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val return now >= timestamp + duration } - private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + private inner class Consumer(val cpu: ProcessingUnit) : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val fragment = pullFragment(now) if (fragment == null) { - ctx.close() + conn.close() return Long.MAX_VALUE } @@ -91,7 +91,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val // Fragment is in the future if (timestamp > now) { - ctx.push(0.0) + conn.push(0.0) return timestamp - now } @@ -103,7 +103,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val val deadline = timestamp + fragment.duration val duration = deadline - now - ctx.push(if (cpu.id < cores && usage > 0.0) usage else 0.0) + conn.push(if (cpu.id < cores && usage > 0.0) usage else 0.0) return duration } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index 5dd18271..dabe60e0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -23,9 +23,9 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource /** * A helper class to manage the lifecycle of a [SimWorkload] @@ -34,27 +34,27 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { /** * The resource consumers which represent the lifecycle of the workload. */ - private val waiting = mutableSetOf<SimResourceConsumer>() + private val waiting = mutableSetOf<FlowSource>() /** * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. */ - public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer { + public fun waitFor(consumer: FlowSource): FlowSource { waiting.add(consumer) - return object : SimResourceConsumer by consumer { - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + return object : FlowSource by consumer { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { try { - consumer.onEvent(ctx, event) + consumer.onEvent(conn, now, event) } finally { - if (event == SimResourceEvent.Exit) { + if (event == FlowEvent.Exit) { complete(consumer) } } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { try { - consumer.onFailure(ctx, cause) + consumer.onFailure(conn, cause) } finally { complete(consumer) } @@ -65,9 +65,9 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { } /** - * Complete the specified [SimResourceConsumer]. + * Complete the specified [FlowSource]. */ - private fun complete(consumer: SimResourceConsumer) { + private fun complete(consumer: FlowSource) { if (waiting.remove(consumer) && waiting.isEmpty()) { ctx.close() } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 81268879..0bb24ed8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -34,10 +34,10 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.compute.workload.SimWorkloadLifecycle import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.source.FixedFlowSource import org.opendc.simulator.network.SimNetworkSink import org.opendc.simulator.power.SimPowerSource -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * Test suite for the [SimBareMetalMachine] class. @@ -60,7 +60,7 @@ class SimMachineTest { @Test fun testFlopsWorkload() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -83,7 +83,7 @@ class SimMachineTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -100,13 +100,13 @@ class SimMachineTest { @Test fun testPower() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(LinearPowerModel(100.0, 50.0)) ) - val source = SimPowerSource(interpreter, capacity = 1000.0) + val source = SimPowerSource(engine, capacity = 1000.0) source.connect(machine.psu) try { @@ -125,7 +125,7 @@ class SimMachineTest { @Test fun testCapacityClamp() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -151,7 +151,7 @@ class SimMachineTest { @Test fun testMemory() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -171,7 +171,7 @@ class SimMachineTest { @Test fun testMemoryUsage() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -180,7 +180,7 @@ class SimMachineTest { machine.run(object : SimWorkload { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) - ctx.memory.startConsumer(lifecycle.waitFor(SimWorkConsumer(ctx.memory.capacity, utilization = 0.8))) + ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) } }) @@ -192,22 +192,22 @@ class SimMachineTest { @Test fun testNetUsage() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val adapter = (machine.peripherals[0] as SimNetworkAdapter) - adapter.connect(SimNetworkSink(interpreter, adapter.bandwidth)) + adapter.connect(SimNetworkSink(engine, adapter.bandwidth)) try { machine.run(object : SimWorkload { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) val iface = ctx.net[0] - iface.tx.startConsumer(lifecycle.waitFor(SimWorkConsumer(iface.bandwidth, utilization = 0.8))) + iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) } }) @@ -219,9 +219,9 @@ class SimMachineTest { @Test fun testDiskReadUsage() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -231,7 +231,7 @@ class SimMachineTest { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) val disk = ctx.storage[0] - disk.read.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.read.capacity, utilization = 0.8))) + disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) } }) @@ -243,9 +243,9 @@ class SimMachineTest { @Test fun testDiskWriteUsage() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -255,7 +255,7 @@ class SimMachineTest { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) val disk = ctx.storage[0] - disk.write.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.write.capacity, utilization = 0.8))) + disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) } }) @@ -268,7 +268,7 @@ class SimMachineTest { @Test fun testCancellation() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -290,7 +290,7 @@ class SimMachineTest { @Test fun testConcurrentRuns() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -313,7 +313,7 @@ class SimMachineTest { @Test fun testClose() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt index 6c9ec7bd..e5b509f0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt @@ -29,8 +29,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.power.SimPowerSource -import org.opendc.simulator.resources.SimResourceInterpreter /** * Test suite for [SimPsu] @@ -55,8 +55,8 @@ internal class SimPsuTest { val ratedOutputPower = 240.0 val energyEfficiency = mapOf(0.0 to 1.0) - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = ratedOutputPower) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = ratedOutputPower) val cpuLogic = mockk<PowerDriver.Logic>() every { cpuLogic.computePower() } returns 0.0 @@ -78,8 +78,8 @@ internal class SimPsuTest { 1.0 to 0.94, ) - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = ratedOutputPower) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = ratedOutputPower) val cpuLogic = mockk<PowerDriver.Logic>() every { cpuLogic.computePower() } returnsMany listOf(50.0, 100.0, 150.0, 200.0) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index 8cd535ad..058d5d28 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -40,7 +40,7 @@ import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * Test suite for the [SimHypervisor] class. @@ -94,7 +94,7 @@ internal class SimHypervisorTest { ), ) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener) @@ -163,7 +163,7 @@ internal class SimHypervisorTest { ) ) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -204,7 +204,7 @@ internal class SimHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -234,7 +234,7 @@ internal class SimHypervisorTest { ) val interferenceModel = VmInterferenceModel(groups) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 55d6d7c4..95fb6679 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -40,7 +40,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * A test suite for the [SimSpaceSharedHypervisor]. @@ -74,11 +74,11 @@ internal class SimSpaceSharedHypervisorTest { ), ) - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } val vm = hypervisor.createMachine(machineModel) @@ -98,11 +98,11 @@ internal class SimSpaceSharedHypervisorTest { fun testRuntimeWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -121,11 +121,11 @@ internal class SimSpaceSharedHypervisorTest { fun testFlopsWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -142,11 +142,11 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -170,11 +170,9 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -194,7 +192,7 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val interpreter = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt index c39859bf..f557c8d3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt @@ -55,7 +55,7 @@ internal class PStatePowerDriverTest { val cpu = mockk<SimProcessingUnit>(relaxUnitFun = true) every { cpu.capacity } returns 3200.0 - every { cpu.speed } returns 1200.0 + every { cpu.rate } returns 1200.0 val driver = PStatePowerDriver( sortedMapOf( @@ -77,10 +77,10 @@ internal class PStatePowerDriverTest { val cpus = listOf(cpu, cpu) every { cpus[0].capacity } returns 1000.0 - every { cpus[0].speed } returns 1200.0 + every { cpus[0].rate } returns 1200.0 every { cpus[1].capacity } returns 3500.0 - every { cpus[1].speed } returns 1200.0 + every { cpus[1].rate } returns 1200.0 val driver = PStatePowerDriver( sortedMapOf( @@ -112,11 +112,11 @@ internal class PStatePowerDriverTest { val logic = driver.createLogic(machine, listOf(cpu)) - every { cpu.speed } returns 1400.0 + every { cpu.rate } returns 1400.0 every { cpu.capacity } returns 1400.0 assertEquals(150.0, logic.computePower()) - every { cpu.speed } returns 1400.0 + every { cpu.rate } returns 1400.0 every { cpu.capacity } returns 4000.0 assertEquals(235.0, logic.computePower()) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index 78019c2e..cdbffe4b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -31,7 +31,7 @@ import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * Test suite for the [SimTraceWorkloadTest] class. @@ -52,7 +52,7 @@ class SimTraceWorkloadTest { @Test fun testSmoke() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -79,7 +79,7 @@ class SimTraceWorkloadTest { @Test fun testOffset() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -106,7 +106,7 @@ class SimTraceWorkloadTest { @Test fun testSkipFragment() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -134,7 +134,7 @@ class SimTraceWorkloadTest { @Test fun testZeroCores() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts index 68047d5c..5a956fee 100644 --- a/opendc-simulator/opendc-simulator-resources/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Uniform resource consumption simulation model" +description = "High-performance flow simulator" plugins { `kotlin-library-conventions` diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index fbc3f319..4834f10f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -20,13 +20,15 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow.source.TraceFlowSource import org.openjdk.jmh.annotations.* import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -36,101 +38,101 @@ import java.util.concurrent.TimeUnit @Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) @OptIn(ExperimentalCoroutinesApi::class) -class SimResourceBenchmarks { +class FlowBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var interpreter: SimResourceInterpreter + private lateinit var engine: FlowEngine @Setup fun setUp() { scope = SimulationCoroutineScope() - interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) + engine = FlowEngine(scope.coroutineContext, scope.clock) } @State(Scope.Thread) class Workload { - lateinit var trace: Sequence<SimTraceConsumer.Fragment> + lateinit var trace: Sequence<TraceFlowSource.Fragment> @Setup fun setUp() { val random = ThreadLocalRandom.current() - val entries = List(10000) { SimTraceConsumer.Fragment(1000, random.nextDouble(0.0, 4500.0)) } + val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } trace = entries.asSequence() } } @Benchmark - fun benchmarkSource(state: Workload) { + fun benchmarkSink(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, interpreter) - return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) + val provider = FlowSink(engine, 4200.0) + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkForwardOverhead(state: Workload) { + fun benchmarkForward(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, interpreter) - val forwarder = SimResourceForwarder() + val provider = FlowSink(engine, 4200.0) + val forwarder = FlowForwarder(engine) provider.startConsumer(forwarder) - return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace)) + return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { + fun benchmarkMuxMaxMinSingleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(interpreter) + val switch = MaxMinFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) - val provider = switch.newOutput() - return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { + fun benchmarkMuxMaxMinTripleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(interpreter) + val switch = MaxMinFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) repeat(3) { launch { - val provider = switch.newOutput() - provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) } } } } @Benchmark - fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { + fun benchmarkMuxExclusiveSingleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchExclusive() + val switch = ForwardingFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) - val provider = switch.newOutput() - return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { + fun benchmarkMuxExclusiveTripleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchExclusive() + val switch = ForwardingFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) repeat(2) { launch { - val provider = switch.newOutput() - provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index 085cba63..c8092082 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -20,25 +20,22 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow -import org.opendc.simulator.resources.impl.SimResourceCountersImpl +import org.opendc.simulator.flow.internal.FlowCountersImpl /** - * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations. + * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. */ -public abstract class SimAbstractResourceProvider( - private val interpreter: SimResourceInterpreter, - initialCapacity: Double -) : SimResourceProvider { +public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { /** - * A flag to indicate that the resource provider is active. + * A flag to indicate that the flow consumer is active. */ public override val isActive: Boolean get() = ctx != null /** - * The capacity of the resource. + * The capacity of the consumer. */ public override var capacity: Double = initialCapacity set(value) { @@ -47,51 +44,51 @@ public abstract class SimAbstractResourceProvider( } /** - * The current processing speed of the resource. + * The current processing rate of the consumer. */ - public override val speed: Double - get() = ctx?.speed ?: 0.0 + public override val rate: Double + get() = ctx?.rate ?: 0.0 /** - * The resource processing speed demand at this instant. + * The flow processing rate demand at this instant. */ public override val demand: Double get() = ctx?.demand ?: 0.0 /** - * The resource counters to track the execution metrics of the resource. + * The flow counters to track the flow metrics of the consumer. */ - public override val counters: SimResourceCounters + public override val counters: FlowCounters get() = _counters - private val _counters = SimResourceCountersImpl() + private val _counters = FlowCountersImpl() /** - * The [SimResourceControllableContext] that is currently running. + * The [FlowConsumerContext] that is currently running. */ - protected var ctx: SimResourceControllableContext? = null + protected var ctx: FlowConsumerContext? = null private set /** - * Construct the [SimResourceProviderLogic] instance for a new consumer. + * Construct the [FlowConsumerLogic] instance for a new source. */ - protected abstract fun createLogic(): SimResourceProviderLogic + protected abstract fun createLogic(): FlowConsumerLogic /** - * Start the specified [SimResourceControllableContext]. + * Start the specified [FlowConsumerContext]. */ - protected open fun start(ctx: SimResourceControllableContext) { + protected open fun start(ctx: FlowConsumerContext) { ctx.start() } /** - * The previous demand for the resource. + * The previous demand for the consumer. */ private var previousDemand = 0.0 /** - * Update the counters of the resource provider. + * Update the counters of the flow consumer. */ - protected fun updateCounters(ctx: SimResourceContext, delta: Long) { + protected fun updateCounters(ctx: FlowConnection, delta: Long) { val demand = previousDemand previousDemand = ctx.demand @@ -102,7 +99,7 @@ public abstract class SimAbstractResourceProvider( val counters = _counters val deltaS = delta / 1000.0 val work = demand * deltaS - val actualWork = ctx.speed * deltaS + val actualWork = ctx.rate * deltaS val remainingWork = work - actualWork counters.demand += work @@ -111,7 +108,7 @@ public abstract class SimAbstractResourceProvider( } /** - * Update the counters of the resource provider. + * Update the counters of the flow consumer. */ protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { val counters = _counters @@ -120,9 +117,9 @@ public abstract class SimAbstractResourceProvider( counters.overcommit += overcommit } - final override fun startConsumer(consumer: SimResourceConsumer) { - check(ctx == null) { "Resource is in invalid state" } - val ctx = interpreter.newContext(consumer, createLogic()) + final override fun startConsumer(source: FlowSource) { + check(ctx == null) { "Consumer is in invalid state" } + val ctx = engine.newContext(source, createLogic()) ctx.capacity = capacity this.ctx = ctx @@ -130,8 +127,8 @@ public abstract class SimAbstractResourceProvider( start(ctx) } - final override fun interrupt() { - ctx?.interrupt() + final override fun pull() { + ctx?.pull() } final override fun cancel() { @@ -142,5 +139,5 @@ public abstract class SimAbstractResourceProvider( } } - override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]" + override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index 225cae0b..fa833961 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -20,49 +20,41 @@ * SOFTWARE. */ -package org.opendc.simulator.resources - -import java.time.Clock +package org.opendc.simulator.flow /** - * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a - * resource and a resource consumer. + * An active connection between a [FlowSource] and [FlowConsumer]. */ -public interface SimResourceContext : AutoCloseable { - /** - * The virtual clock tracking simulation time. - */ - public val clock: Clock - +public interface FlowConnection : AutoCloseable { /** - * The resource capacity available at this instant. + * The capacity of the connection. */ public val capacity: Double /** - * The resource processing speed at this instant. + * The flow rate over the connection. */ - public val speed: Double + public val rate: Double /** - * The resource processing speed demand at this instant. + * The flow demand of the source. */ public val demand: Double /** - * Ask the resource provider to interrupt its resource. + * Pull the source. */ - public fun interrupt() + public fun pull() /** - * Push the given flow to this context. + * Push the given flow [rate] over this connection. * * @param rate The rate of the flow to push. */ public fun push(rate: Double) /** - * Stop the resource context. + * Disconnect the consumer from its source. */ public override fun close() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index b68b7261..3a6e2e97 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -20,77 +20,80 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException /** - * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. + * A consumer of a [FlowSource]. */ -public interface SimResourceProvider { +public interface FlowConsumer { /** - * A flag to indicate that the resource provider is currently being consumed by a [SimResourceConsumer]. + * A flag to indicate that the consumer is currently consuming a [FlowSource]. */ public val isActive: Boolean /** - * The resource capacity available at this instant. + * The flow capacity of this consumer. */ public val capacity: Double /** - * The current processing speed of the resource. + * The current flow rate of the consumer. */ - public val speed: Double + public val rate: Double /** - * The resource processing speed demand at this instant. + * The current flow demand. */ public val demand: Double /** - * The resource counters to track the execution metrics of the resource. + * The flow counters to track the flow metrics of the consumer. */ - public val counters: SimResourceCounters + public val counters: FlowCounters /** - * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. + * Start consuming the specified [source]. * - * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. + * @throws IllegalStateException if the consumer is already active. */ - public fun startConsumer(consumer: SimResourceConsumer) + public fun startConsumer(source: FlowSource) /** - * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. + * Ask the consumer to pull its source. + * + * If the consumer is not active, this operation will be a no-op. */ - public fun interrupt() + public fun pull() /** - * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. + * Disconnect the consumer from its source. + * + * If the consumer is not active, this operation will be a no-op. */ public fun cancel() } /** - * Consume the resource provided by this provider using the specified [consumer] and suspend execution until - * the consumer has finished. + * Consume the specified [source] and suspend execution until the source is fully consumed or failed. */ -public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { +public suspend fun FlowConsumer.consume(source: FlowSource) { return suspendCancellableCoroutine { cont -> - startConsumer(object : SimResourceConsumer by consumer { - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - consumer.onEvent(ctx, event) + startConsumer(object : FlowSource by source { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + source.onEvent(conn, now, event) - if (event == SimResourceEvent.Exit && !cont.isCompleted) { + if (event == FlowEvent.Exit && !cont.isCompleted) { cont.resume(Unit) } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { try { - consumer.onFailure(ctx, cause) + source.onFailure(conn, cause) cont.resumeWithException(cause) } catch (e: Throwable) { e.addSuppressed(cause) @@ -98,7 +101,7 @@ public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { } } - override fun toString(): String = "SimSuspendingResourceConsumer" + override fun toString(): String = "SuspendingFlowSource" }) cont.invokeOnCancellation { cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt new file mode 100644 index 00000000..75b2d25b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A controllable [FlowConnection]. + * + * This interface is used by [FlowConsumer]s to control the connection between it and the source. + */ +public interface FlowConsumerContext : FlowConnection { + /** + * The capacity of the connection. + */ + public override var capacity: Double + + /** + * Start the flow over the connection. + */ + public fun start() + + /** + * Synchronously flush the changes of the connection. + */ + public fun flush() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index cc718165..c69cb17e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -20,23 +20,21 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A collection of callbacks associated with a flow stage. + * A collection of callbacks associated with a [FlowConsumer]. */ -public interface SimResourceProviderLogic { +public interface FlowConsumerLogic { /** - * This method is invoked when the consumer ask to consume the resource for the specified [duration]. + * This method is invoked when a [FlowSource] changes the rate of flow to this consumer. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds. - * @param limit The limit on the work rate of the resource consumer. - * @param duration The duration of the consumption in milliseconds. - * @return The deadline of the resource consumption. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + * @param rate The requested processing rate of the source. */ - public fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {} + public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -45,14 +43,14 @@ public interface SimResourceProviderLogic { * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {} + public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} /** - * This method is invoked when the resource consumer has finished. + * This method is invoked when the [FlowSource] is completed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. */ - public fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index 11924db2..e15d7643 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -20,34 +20,34 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * An interface that tracks cumulative counts of the work performed by a resource. + * An interface that tracks cumulative counts of the flow accumulation over a stage. */ -public interface SimResourceCounters { +public interface FlowCounters { /** - * The amount of work that resource consumers wanted the resource to perform. + * The accumulated flow that a source wanted to push over the connection. */ public val demand: Double /** - * The amount of work performed by the resource. + * The accumulated flow that was actually transferred over the connection. */ public val actual: Double /** - * The amount of work that could not be completed due to overcommitted resources. + * The accumulated flow that could not be transferred over the connection. */ public val overcommit: Double /** - * The amount of work lost due to interference. + * The accumulated flow lost due to interference between sources. */ public val interference: Double /** - * Reset the resource counters. + * Reset the flow counters. */ public fun reset() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt index 4bfeaf20..65224827 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt @@ -20,31 +20,31 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl import java.time.Clock import kotlin.coroutines.CoroutineContext /** - * The resource interpreter is responsible for managing the interaction between resource consumer and provider. + * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s. * - * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation + * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. */ -public interface SimResourceInterpreter { +public interface FlowEngine { /** - * The [Clock] associated with this interpreter. + * The virtual [Clock] associated with this engine. */ public val clock: Clock /** - * Create a new [SimResourceControllableContext] with the given [provider]. + * Create a new [FlowConsumerContext] with the given [provider]. * * @param consumer The consumer logic. * @param provider The logic of the resource provider. */ - public fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext + public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext /** * Start batching the execution of resource updates until [popBatch] is called. @@ -67,14 +67,15 @@ public interface SimResourceInterpreter { public companion object { /** - * Construct a new [SimResourceInterpreter] implementation. + * Construct a new [FlowEngine] implementation. * * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ + @JvmStatic @JvmName("create") - public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter { - return SimResourceInterpreterImpl(context, clock) + public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine { + return FlowEngineImpl(context, clock) } } } @@ -84,7 +85,7 @@ public interface SimResourceInterpreter { * * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. */ -public inline fun SimResourceInterpreter.batch(block: () -> Unit) { +public inline fun FlowEngine.batch(block: () -> Unit) { try { pushBatch() block() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt index 959427f1..14c85183 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt @@ -20,29 +20,29 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A resource event that is communicated to the resource consumer. + * A flow event that is communicated to a [FlowSource]. */ -public enum class SimResourceEvent { +public enum class FlowEvent { /** - * This event is emitted to the consumer when it has started. + * This event is emitted to the source when it has started. */ Start, /** - * This event is emitted to the consumer when it has exited. + * This event is emitted to the source when it is stopped. */ Exit, /** - * This event is emitted to the consumer when it has started a new resource consumption or idle cycle. + * This event is emitted to the source when the system has converged into a steady state. */ - Run, + Converge, /** - * This event is emitted to the consumer when the capacity of the resource has changed. + * This event is emitted to the source when the capacity of the consumer has changed. */ Capacity, } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 0cd2bfc7..2074033e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -20,21 +20,21 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow -import org.opendc.simulator.resources.impl.SimResourceCountersImpl -import java.time.Clock +import org.opendc.simulator.flow.internal.FlowCountersImpl /** - * A class that acts as a [SimResourceConsumer] and [SimResourceProvider] at the same time. + * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. * + * @param engine The [FlowEngine] the forwarder runs in. * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. */ -public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimResourceConsumer, SimResourceProvider, AutoCloseable { +public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { /** - * The delegate [SimResourceConsumer]. + * The delegate [FlowSource]. */ - private var delegate: SimResourceConsumer? = null + private var delegate: FlowSource? = null /** * A flag to indicate that the delegate was started. @@ -42,28 +42,25 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR private var hasDelegateStarted: Boolean = false /** - * The exposed [SimResourceContext]. + * The exposed [FlowConnection]. */ - private val _ctx = object : SimResourceContext { - override val clock: Clock - get() = _innerCtx!!.clock - + private val _ctx = object : FlowConnection { override val capacity: Double get() = _innerCtx?.capacity ?: 0.0 override val demand: Double get() = _innerCtx?.demand ?: 0.0 - override val speed: Double - get() = _innerCtx?.speed ?: 0.0 + override val rate: Double + get() = _innerCtx?.rate ?: 0.0 - override fun interrupt() { - _innerCtx?.interrupt() + override fun pull() { + _innerCtx?.pull() } override fun push(rate: Double) { _innerCtx?.push(rate) - _limit = rate + _demand = rate } override fun close() { @@ -78,14 +75,14 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR // reset beforehand the existing state and check whether it has been updated afterwards reset() - delegate.onEvent(this, SimResourceEvent.Exit) + delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) } } /** - * The [SimResourceContext] in which the forwarder runs. + * The [FlowConnection] in which the forwarder runs. */ - private var _innerCtx: SimResourceContext? = null + private var _innerCtx: FlowConnection? = null override val isActive: Boolean get() = delegate != null @@ -93,27 +90,27 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR override val capacity: Double get() = _ctx.capacity - override val speed: Double - get() = _ctx.speed + override val rate: Double + get() = _ctx.rate override val demand: Double get() = _ctx.demand - override val counters: SimResourceCounters + override val counters: FlowCounters get() = _counters - private val _counters = SimResourceCountersImpl() + private val _counters = FlowCountersImpl() - override fun startConsumer(consumer: SimResourceConsumer) { - check(delegate == null) { "Resource transformer already active" } + override fun startConsumer(source: FlowSource) { + check(delegate == null) { "Forwarder already active" } - delegate = consumer + delegate = source - // Interrupt the provider to replace the consumer - interrupt() + // Pull to replace the source + pull() } - override fun interrupt() { - _ctx.interrupt() + override fun pull() { + _ctx.pull() } override fun cancel() { @@ -124,7 +121,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR this.delegate = null if (ctx != null) { - delegate.onEvent(this._ctx, SimResourceEvent.Exit) + delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit) } } } @@ -134,41 +131,41 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR if (ctx != null) { this._innerCtx = null - ctx.interrupt() + ctx.pull() } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val delegate = delegate if (!hasDelegateStarted) { start() } - updateCounters(ctx, delta) + updateCounters(conn, delta) - return delegate?.onNext(this._ctx, now, delta) ?: Long.MAX_VALUE + return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> { - _innerCtx = ctx + FlowEvent.Start -> { + _innerCtx = conn } - SimResourceEvent.Exit -> { + FlowEvent.Exit -> { _innerCtx = null val delegate = delegate if (delegate != null) { reset() - delegate.onEvent(this._ctx, SimResourceEvent.Exit) + delegate.onEvent(this._ctx, now, FlowEvent.Exit) } } - else -> delegate?.onEvent(this._ctx, event) + else -> delegate?.onEvent(this._ctx, now, event) } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { _innerCtx = null val delegate = delegate @@ -183,7 +180,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR */ private fun start() { val delegate = delegate ?: return - delegate.onEvent(checkNotNull(_innerCtx), SimResourceEvent.Start) + delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start) hasDelegateStarted = true } @@ -197,22 +194,22 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR } /** - * The requested speed. + * The requested flow rate. */ - private var _limit: Double = 0.0 + private var _demand: Double = 0.0 /** - * Update the resource counters for the transformer. + * Update the flow counters for the transformer. */ - private fun updateCounters(ctx: SimResourceContext, delta: Long) { + private fun updateCounters(ctx: FlowConnection, delta: Long) { if (delta <= 0) { return } val counters = _counters val deltaS = delta / 1000.0 - val work = _limit * deltaS - val actualWork = ctx.speed * deltaS + val work = _demand * deltaS + val actualWork = ctx.rate * deltaS counters.demand += work counters.actual += actualWork counters.overcommit += (work - actualWork) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index c8d4cf0d..fb6ca85d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -20,42 +20,42 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity. + * A [FlowSink] represents a sink with a fixed capacity. * * @param initialCapacity The initial capacity of the resource. - * @param interpreter The interpreter that is used for managing the resource contexts. - * @param parent The parent resource system. + * @param engine The engine that is used for driving the flow simulation. + * @param parent The parent flow system. */ -public class SimResourceSource( +public class FlowSink( + private val engine: FlowEngine, initialCapacity: Double, - private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null -) : SimAbstractResourceProvider(interpreter, initialCapacity) { - override fun createLogic(): SimResourceProviderLogic { - return object : SimResourceProviderLogic { - override fun onConsume( - ctx: SimResourceControllableContext, + private val parent: FlowSystem? = null +) : AbstractFlowConsumer(engine, initialCapacity) { + + override fun createLogic(): FlowConsumerLogic { + return object : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, now: Long, delta: Long, - limit: Double, - duration: Long + rate: Double ) { updateCounters(ctx, delta) } - override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { updateCounters(ctx, delta) cancel() } - override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { parent?.onConverge(now) } } } - override fun toString(): String = "SimResourceSource[capacity=$capacity]" + override fun toString(): String = "FlowSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 0b25358a..077b4d38 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -20,38 +20,39 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A [SimResourceConsumer] characterizes how a resource is consumed. + * A source of flow that is consumed by a [FlowConsumer]. * - * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) - * for multiple resource providers, unless explicitly said otherwise. + * Implementations of this interface should be considered stateful and must be assumed not to be re-usable + * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise. */ -public interface SimResourceConsumer { +public interface FlowSource { /** - * This method is invoked when the resource provider is pulling this resource consumer. + * This method is invoked when the source is pulled. * - * @param ctx The execution context in which the consumer runs. - * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call in milliseconds. + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the pull is occurring. + * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. * @return The duration after which the resource consumer should be pulled again. */ - public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long + public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long /** * This method is invoked when an event has occurred. * - * @param ctx The execution context in which the consumer runs. + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the event is occurring. * @param event The event that has occurred. */ - public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {} + public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} /** - * This method is invoked when a resource consumer throws an exception. + * This method is invoked when the source throws an exception. * - * @param ctx The execution context in which the consumer runs. + * @param conn The connection between the source and consumer. * @param cause The cause of the failure. */ - public fun onFailure(ctx: SimResourceContext, cause: Throwable) {} + public fun onFailure(conn: FlowConnection, cause: Throwable) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt index 609262cb..db6aa69f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** * A system of possible multiple sub-resources. @@ -28,11 +28,11 @@ package org.opendc.simulator.resources * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the * resource provider. */ -public interface SimResourceSystem { +public interface FlowSystem { /** * The parent system to which this system belongs or `null` if it has no parent. */ - public val parent: SimResourceSystem? + public val parent: FlowSystem? /** * This method is invoked when the system has converged to a steady-state. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt index 1066777f..aa2713b6 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt @@ -1,10 +1,10 @@ -package org.opendc.simulator.resources.interference +package org.opendc.simulator.flow.interference -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.flow.FlowSource /** - * An interference domain represents a system of resources where [resource consumers][SimResourceConsumer] may incur - * performance variability due to operating on the same resources and therefore causing interference. + * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur + * performance variability due to operating on the same resource and therefore causing interference. */ public interface InterferenceDomain { /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt index 8b12e7b4..d28ebde5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.interference +package org.opendc.simulator.flow.interference /** * A key that uniquely identifies a participant of an interference domain. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index cbfa7afd..9f3afc4d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -20,28 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.impl +package org.opendc.simulator.flow.internal -import org.opendc.simulator.resources.* -import java.time.Clock +import org.opendc.simulator.flow.* import java.util.ArrayDeque import kotlin.math.max import kotlin.math.min /** - * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. */ -internal class SimResourceContextImpl( - private val interpreter: SimResourceInterpreterImpl, - private val consumer: SimResourceConsumer, - private val logic: SimResourceProviderLogic -) : SimResourceControllableContext { +internal class FlowConsumerContextImpl( + private val engine: FlowEngineImpl, + private val source: FlowSource, + private val logic: FlowConsumerLogic +) : FlowConsumerContext { /** - * The clock of the context. + * The clock to track simulation time. */ - override val clock: Clock - get() = _clock - private val _clock = interpreter.clock + private val _clock = engine.clock /** * The capacity of the resource. @@ -65,7 +62,7 @@ internal class SimResourceContextImpl( /** * The current processing speed of the resource. */ - override val speed: Double + override val rate: Double get() = _rate private var _rate = 0.0 @@ -101,14 +98,14 @@ internal class SimResourceContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque() + private val _timers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque() override fun start() { check(_state == State.Pending) { "Consumer is already started" } - interpreter.batch { - consumer.onEvent(this, SimResourceEvent.Start) + engine.batch { + source.onEvent(this, _clock.millis(), FlowEvent.Start) _state = State.Active - interrupt() + pull() } } @@ -117,36 +114,27 @@ internal class SimResourceContextImpl( return } - interpreter.batch { + engine.batch { _state = State.Stopped if (!_updateActive) { - val now = clock.millis() + val now = _clock.millis() val delta = max(0, now - _lastUpdate) doStop(now, delta) // FIX: Make sure the context converges _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(clock.millis()) + scheduleUpdate(_clock.millis()) } } } - override fun interrupt() { + override fun pull() { if (_state == State.Stopped) { return } _flag = _flag or FLAG_INTERRUPT - scheduleUpdate(clock.millis()) - } - - override fun invalidate() { - if (_state == State.Stopped) { - return - } - - _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(clock.millis()) + scheduleUpdate(_clock.millis()) } override fun flush() { @@ -154,7 +142,7 @@ internal class SimResourceContextImpl( return } - interpreter.scheduleSync(clock.millis(), this) + engine.scheduleSync(_clock.millis(), this) } override fun push(rate: Double) { @@ -167,7 +155,8 @@ internal class SimResourceContextImpl( // Invalidate only if the active limit is change and no update is active // If an update is active, it will already get picked up at the end of the update if (_activeLimit != rate && !_updateActive) { - invalidate() + _flag = _flag or FLAG_INVALIDATE + scheduleUpdate(_clock.millis()) } } @@ -196,7 +185,7 @@ internal class SimResourceContextImpl( val delta = max(0, now - lastUpdate) try { - val duration = consumer.onNext(this, now, delta) + val duration = source.onPull(this, now, delta) val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration // Reset update flags @@ -205,7 +194,7 @@ internal class SimResourceContextImpl( // Check whether the state has changed after [consumer.onNext] when (_state) { State.Active -> { - logic.onConsume(this, now, delta, _limit, duration) + logic.onPush(this, now, delta, _limit) // Schedule an update at the new deadline scheduleUpdate(now, newDeadline) @@ -261,7 +250,7 @@ internal class SimResourceContextImpl( try { if (_state == State.Active) { - consumer.onEvent(this, SimResourceEvent.Run) + source.onEvent(this, timestamp, FlowEvent.Converge) } logic.onConverge(this, timestamp, delta) @@ -270,14 +259,14 @@ internal class SimResourceContextImpl( } } - override fun toString(): String = "SimResourceContextImpl[capacity=$capacity,rate=$_rate]" + override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" /** * Stop the resource context. */ private fun doStop(now: Long, delta: Long) { try { - consumer.onEvent(this, SimResourceEvent.Exit) + source.onEvent(this, now, FlowEvent.Exit) logic.onFinish(this, now, delta) } catch (cause: Throwable) { doFail(now, delta, cause) @@ -292,7 +281,7 @@ internal class SimResourceContextImpl( */ private fun doFail(now: Long, delta: Long, cause: Throwable) { try { - consumer.onFailure(this, cause) + source.onFailure(this, cause) } catch (e: Throwable) { e.addSuppressed(cause) e.printStackTrace() @@ -310,11 +299,11 @@ internal class SimResourceContextImpl( return } - interpreter.batch { + engine.batch { // Inform the consumer of the capacity change. This might already trigger an interrupt. - consumer.onEvent(this, SimResourceEvent.Capacity) + source.onEvent(this, _clock.millis(), FlowEvent.Capacity) - interrupt() + pull() } } @@ -322,7 +311,7 @@ internal class SimResourceContextImpl( * Schedule an update for this resource context. */ private fun scheduleUpdate(now: Long) { - interpreter.scheduleImmediate(now, this) + engine.scheduleImmediate(now, this) } /** @@ -331,7 +320,7 @@ internal class SimResourceContextImpl( private fun scheduleUpdate(now: Long, target: Long) { val timers = _timers if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { - timers.addFirst(interpreter.scheduleDelayed(now, this, target)) + timers.addFirst(engine.scheduleDelayed(now, this, target)) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt index 01062179..141d335d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -20,14 +20,14 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.impl +package org.opendc.simulator.flow.internal -import org.opendc.simulator.resources.SimResourceCounters +import org.opendc.simulator.flow.FlowCounters /** - * Mutable implementation of the [SimResourceCounters] interface. + * Mutable implementation of the [FlowCounters] interface. */ -internal class SimResourceCountersImpl : SimResourceCounters { +internal class FlowCountersImpl : FlowCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 override var overcommit: Double = 0.0 @@ -41,6 +41,6 @@ internal class SimResourceCountersImpl : SimResourceCounters { } override fun toString(): String { - return "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 2abf0749..1a50da2c 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -20,25 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.impl +package org.opendc.simulator.flow.internal import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* import java.time.Clock import java.util.* import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext /** - * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after. + * Internal implementation of the [FlowEngine] interface. * * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter { +internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -46,24 +46,24 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } /** - * The queue of resource updates that are scheduled for immediate execution. + * The queue of connection updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque<SimResourceContextImpl>() + private val queue = ArrayDeque<FlowConsumerContextImpl>() /** - * A priority queue containing the resource updates to be scheduled in the future. + * A priority queue containing the connection updates to be scheduled in the future. */ private val futureQueue = PriorityQueue<Timer>() /** - * The stack of interpreter invocations to occur in the future. + * The stack of engine invocations to occur in the future. */ private val futureInvocations = ArrayDeque<Invocation>() /** - * The systems that have been visited during the interpreter cycle. + * The systems that have been visited during the engine cycle. */ - private val visited = linkedSetOf<SimResourceContextImpl>() + private val visited = linkedSetOf<FlowConsumerContextImpl>() /** * The index in the batch stack. @@ -71,7 +71,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, private var batchIndex = 0 /** - * A flag to indicate that the interpreter is currently active. + * A flag to indicate that the engine is currently active. */ private val isRunning: Boolean get() = batchIndex > 0 @@ -79,57 +79,57 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * Update the specified [ctx] synchronously. */ - fun scheduleSync(now: Long, ctx: SimResourceContextImpl) { + fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { ctx.doUpdate(now) visited.add(ctx) - // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked - // up by the active interpreter. + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. if (isRunning) { return } try { batchIndex++ - runInterpreter(now) + runEngine(now) } finally { batchIndex-- } } /** - * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle. + * Enqueue the specified [ctx] to be updated immediately during the active engine cycle. * - * This method should be used when the state of a resource context is invalidated/interrupted and needs to be - * re-computed. In case no interpreter is currently active, the interpreter will be started. + * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. In case no engine is currently active, the engine will be started. */ - fun scheduleImmediate(now: Long, ctx: SimResourceContextImpl) { + fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) { queue.add(ctx) - // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked - // up by the active interpreter. + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. if (isRunning) { return } try { batchIndex++ - runInterpreter(now) + runEngine(now) } finally { batchIndex-- } } /** - * Schedule the interpreter to run at [target] to update the resource contexts. + * Schedule the engine to run at [target] to update the flow contexts. * * This method will override earlier calls to this method for the same [ctx]. * * @param now The current virtual timestamp. - * @param ctx The resource context to which the event applies. + * @param ctx The flow context to which the event applies. * @param target The timestamp when the interrupt should happen. */ - fun scheduleDelayed(now: Long, ctx: SimResourceContextImpl, target: Long): Timer { + fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer { val futureQueue = futureQueue require(target >= now) { "Timestamp must be in the future" } @@ -140,7 +140,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, return timer } - override fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext = SimResourceContextImpl(this, consumer, provider) + override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) override fun pushBatch() { batchIndex++ @@ -150,7 +150,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, try { // Flush the work if the platform is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - runInterpreter(clock.millis()) + runEngine(clock.millis()) } } finally { batchIndex-- @@ -158,9 +158,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * Interpret all actions that are scheduled for the current timestamp. + * Run all the enqueued actions for the specified [timestamp][now]. */ - private fun runInterpreter(now: Long) { + private fun runEngine(now: Long) { val queue = queue val futureQueue = futureQueue val futureInvocations = futureInvocations @@ -219,7 +219,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, visited.clear() } while (queue.isNotEmpty()) - // Schedule an interpreter invocation for the next update to occur. + // Schedule an engine invocation for the next update to occur. val headTimer = futureQueue.peek() if (headTimer != null) { trySchedule(now, futureInvocations, headTimer.target) @@ -227,10 +227,10 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * Try to schedule an interpreter invocation at the specified [target]. + * Try to schedule an engine invocation at the specified [target]. * * @param now The current virtual timestamp. - * @param target The virtual timestamp at which the interpreter invocation should happen. + * @param target The virtual timestamp at which the engine invocation should happen. * @param scheduled The queue of scheduled invocations. */ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { @@ -245,7 +245,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, { try { batchIndex++ - runInterpreter(target) + runEngine(target) } finally { batchIndex-- } @@ -267,9 +267,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * A future interpreter invocation. + * A future engine invocation. * - * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case + * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case * the invocation is not needed anymore, it can be cancelled via [cancel]. */ private data class Invocation( @@ -277,7 +277,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, @JvmField val handle: DisposableHandle ) { /** - * Cancel the interpreter invocation. + * Cancel the engine invocation. */ fun cancel() = handle.dispose() } @@ -285,10 +285,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * An update call for [ctx] that is scheduled for [target]. * - * This class represents an update in the future at [target] requested by [ctx]. A deferred update might be - * cancelled if the resource context was invalidated in the meantime. + * This class represents an update in the future at [target] requested by [ctx]. */ - class Timer(@JvmField val ctx: SimResourceContextImpl, @JvmField val target: Long) : Comparable<Timer> { + class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> { override fun compareTo(other: Timer): Int { return target.compareTo(other.target) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 3c25b76d..17b82391 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -20,45 +20,48 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.mux -import org.opendc.simulator.resources.interference.InterferenceKey +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowCounters +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.interference.InterferenceKey /** - * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. + * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. */ -public interface SimResourceSwitch { +public interface FlowMultiplexer { /** - * The output resource providers to which resource consumers can be attached. + * The inputs of the multiplexer that can be used to consume sources. */ - public val outputs: Set<SimResourceProvider> + public val inputs: Set<FlowConsumer> /** - * The input resources that will be switched between the output providers. + * The outputs of the multiplexer over which the flows will be distributed. */ - public val inputs: Set<SimResourceProvider> + public val outputs: Set<FlowConsumer> /** - * The resource counters to track the execution metrics of all switch resources. + * The flow counters to track the flow metrics of all multiplexer inputs. */ - public val counters: SimResourceCounters + public val counters: FlowCounters /** - * Create a new output on the switch. + * Create a new input on this multiplexer. * - * @param key The key of the interference member to which the output belongs. + * @param key The key of the interference member to which the input belongs. */ - public fun newOutput(key: InterferenceKey? = null): SimResourceProvider + public fun newInput(key: InterferenceKey? = null): FlowConsumer /** - * Remove [output] from this switch. + * Remove [input] from this multiplexer. */ - public fun removeOutput(output: SimResourceProvider) + public fun removeInput(input: FlowConsumer) /** - * Add the specified [input] to the switch. + * Add the specified [output] to the multiplexer. */ - public fun addInput(input: SimResourceProvider) + public fun addOutput(output: FlowConsumer) /** * Clear all inputs and outputs from the switch. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt new file mode 100644 index 00000000..811d9460 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey +import java.util.ArrayDeque + +/** + * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means + * that a single input is directly connected to an output and that the multiplexer can only support as many + * inputs as outputs. + * + * @param engine The [FlowEngine] driving the simulation. + */ +public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer { + override val inputs: Set<FlowConsumer> + get() = _inputs + private val _inputs = mutableSetOf<Input>() + + override val outputs: Set<FlowConsumer> + get() = _outputs + private val _outputs = mutableSetOf<FlowConsumer>() + private val _availableOutputs = ArrayDeque<FlowForwarder>() + + override val counters: FlowCounters = object : FlowCounters { + override val demand: Double + get() = _outputs.sumOf { it.counters.demand } + override val actual: Double + get() = _outputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _outputs.sumOf { it.counters.overcommit } + override val interference: Double + get() = _outputs.sumOf { it.counters.interference } + + override fun reset() { + for (input in _outputs) { + input.counters.reset() + } + } + + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val output = Input(forwarder) + _inputs += output + return output + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + + (input as Input).close() + } + + override fun addOutput(output: FlowConsumer) { + if (output in outputs) { + return + } + + val forwarder = FlowForwarder(engine) + + _outputs += output + _availableOutputs += forwarder + + output.startConsumer(object : FlowSource by forwarder { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) { + // De-register the output after it has finished + _outputs -= output + } + + forwarder.onEvent(conn, now, event) + } + }) + } + + override fun clear() { + for (input in _outputs) { + input.cancel() + } + _outputs.clear() + + // Inputs are implicitly cancelled by the output forwarders + _inputs.clear() + } + + /** + * An input on the multiplexer. + */ + private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { + /** + * Close the input. + */ + fun close() { + // We explicitly do not close the forwarder here in order to re-use it across input resources. + _inputs -= this + _availableOutputs += forwarder + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt new file mode 100644 index 00000000..9735f121 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -0,0 +1,399 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceDomain +import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.flow.internal.FlowCountersImpl +import kotlin.math.max +import kotlin.math.min + +/** + * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing. + * + * @param engine The [FlowEngine] to drive the flow simulation. + * @param parent The parent flow system of the multiplexer. + * @param interferenceDomain The interference domain of the multiplexer. + */ +public class MaxMinFlowMultiplexer( + private val engine: FlowEngine, + private val parent: FlowSystem? = null, + private val interferenceDomain: InterferenceDomain? = null +) : FlowMultiplexer { + /** + * The inputs of the multiplexer. + */ + override val inputs: Set<FlowConsumer> + get() = _inputs + private val _inputs = mutableSetOf<Input>() + private val _activeInputs = mutableListOf<Input>() + + /** + * The outputs of the multiplexer. + */ + override val outputs: Set<FlowConsumer> + get() = _outputs + private val _outputs = mutableSetOf<FlowConsumer>() + private val _activeOutputs = mutableListOf<Output>() + + /** + * The flow counters of this multiplexer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + /** + * The actual processing rate of the multiplexer. + */ + private var _rate = 0.0 + + /** + * The demanded processing rate of the input. + */ + private var _demand = 0.0 + + /** + * The capacity of the outputs. + */ + private var _capacity = 0.0 + + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val provider = Input(_capacity, key) + _inputs.add(provider) + return provider + } + + override fun addOutput(output: FlowConsumer) { + val consumer = Output(output) + if (_outputs.add(output)) { + _activeOutputs.add(consumer) + output.startConsumer(consumer) + } + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + // This cast should always succeed since only `Input` instances should be added to `_inputs` + (input as Input).close() + } + + override fun clear() { + for (input in _activeOutputs) { + input.cancel() + } + _activeOutputs.clear() + + for (output in _activeInputs) { + output.cancel() + } + _activeInputs.clear() + } + + /** + * Converge the scheduler of the multiplexer. + */ + private fun runScheduler(now: Long) { + if (_schedulerActive) { + return + } + + _schedulerActive = true + try { + doSchedule(now) + } finally { + _schedulerActive = false + } + } + + /** + * Schedule the inputs over the outputs. + */ + private fun doSchedule(now: Long) { + val activeInputs = _activeInputs + val activeOutputs = _activeOutputs + + // If there is no work yet, mark the inputs as idle. + if (activeInputs.isEmpty()) { + return + } + + val capacity = _capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val inputIterator = activeInputs.listIterator() + for (input in inputIterator) { + input.pull(now) + + // Remove outputs that have finished + if (!input.isActive) { + inputIterator.remove() + } + } + + var demand = 0.0 + + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + var remaining = activeInputs.size + for (input in activeInputs) { + val availableShare = availableCapacity / remaining-- + val grantedRate = min(input.allowedRate, availableShare) + + // Ignore empty sources + if (grantedRate <= 0.0) { + input.actualRate = 0.0 + continue + } + + input.actualRate = grantedRate + demand += input.limit + availableCapacity -= grantedRate + } + + val rate = capacity - availableCapacity + + _demand = demand + _rate = rate + + // Sort all consumers by their capacity + activeOutputs.sort() + + // Divide the requests over the available capacity of the input resources fairly + for (output in activeOutputs) { + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } + } + + /** + * Recompute the capacity of the multiplexer. + */ + private fun updateCapacity() { + val newCapacity = _activeOutputs.sumOf(Output::capacity) + + // No-op if the capacity is unchanged + if (_capacity == newCapacity) { + return + } + + _capacity = newCapacity + + for (input in _inputs) { + input.capacity = newCapacity + } + } + + /** + * An internal [FlowConsumer] implementation for multiplexer inputs. + */ + private inner class Input(capacity: Double, val key: InterferenceKey?) : + AbstractFlowConsumer(engine, capacity), + FlowConsumerLogic, + Comparable<Input> { + /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + val allowedRate: Double + get() = min(capacity, limit) + + /** + * A flag to indicate that the input is closed. + */ + private var _isClosed: Boolean = false + + /** + * The timestamp at which we received the last command. + */ + private var _lastPull: Long = Long.MIN_VALUE + + /** + * Close the input. + * + * This method is invoked when the user removes an input from the switch. + */ + fun close() { + _isClosed = true + cancel() + } + + /* AbstractFlowConsumer */ + override fun createLogic(): FlowConsumerLogic = this + + override fun start(ctx: FlowConsumerContext) { + check(!_isClosed) { "Cannot re-use closed input" } + + _activeInputs += this + super.start(ctx) + } + + /* FlowConsumerLogic */ + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + doUpdateCounters(delta) + + actualRate = 0.0 + this.limit = rate + _lastPull = now + + runScheduler(now) + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + doUpdateCounters(delta) + + limit = 0.0 + actualRate = 0.0 + _lastPull = now + } + + /* Comparable */ + override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) + + /** + * Pull the source if necessary. + */ + fun pull(now: Long) { + val ctx = ctx + if (ctx != null && _lastPull < now) { + ctx.flush() + } + } + + /** + * Helper method to update the flow counters of the multiplexer. + */ + private fun doUpdateCounters(delta: Long) { + if (delta <= 0L) { + return + } + + // Compute the performance penalty due to flow interference + val perfScore = if (interferenceDomain != null) { + val load = _rate / capacity + interferenceDomain.apply(key, load) + } else { + 1.0 + } + + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = actualRate * deltaS + val remainingWork = work - actualWork + + updateCounters(work, actualWork, remainingWork) + + val distCounters = _counters + distCounters.demand += work + distCounters.actual += actualWork + distCounters.overcommit += remainingWork + distCounters.interference += actualWork * max(0.0, 1 - perfScore) + } + } + + /** + * An internal [FlowSource] implementation for multiplexer outputs. + */ + private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> { + /** + * The active [FlowConnection] of this source. + */ + private var _ctx: FlowConnection? = null + + /** + * The capacity of this output. + */ + val capacity: Double + get() = _ctx?.capacity ?: 0.0 + + /** + * Push the specified rate to the consumer. + */ + fun push(rate: Double) { + _ctx?.push(rate) + } + + /** + * Cancel this output. + */ + fun cancel() { + provider.cancel() + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + runScheduler(now) + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + assert(_ctx == null) { "Source running concurrently" } + _ctx = conn + updateCapacity() + } + FlowEvent.Exit -> { + _ctx = null + updateCapacity() + } + FlowEvent.Capacity -> updateCapacity() + else -> {} + } + } + + override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt index bf76711f..d9779c6a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -20,41 +20,37 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource import kotlin.math.roundToLong /** - * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. + * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization]. */ -public class SimWorkConsumer( - private val work: Double, - private val utilization: Double -) : SimResourceConsumer { +public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource { init { - require(work >= 0.0) { "Work must be positive" } + require(amount >= 0.0) { "Amount must be positive" } require(utilization > 0.0) { "Utilization must be positive" } } - private var remainingWork = work + private var remainingAmount = amount - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val actualWork = ctx.speed * delta / 1000.0 - val limit = ctx.capacity * utilization + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumed = conn.rate * delta / 1000.0 + val limit = conn.capacity * utilization - remainingWork -= actualWork + remainingAmount -= consumed - val remainingWork = remainingWork - val duration = (remainingWork / limit * 1000).roundToLong() + val duration = (remainingAmount / limit * 1000).roundToLong() return if (duration > 0) { - ctx.push(limit) + conn.push(limit) duration } else { - ctx.close() + conn.close() Long.MAX_VALUE } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt index 52a42241..b3191ad3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt @@ -20,13 +20,13 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source /** - * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to - * complete, before proceeding its operation. + * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to + * finish a pull, before proceeding its operation. */ -public class SimConsumerBarrier(public val parties: Int) { +public class FlowSourceBarrier(public val parties: Int) { private var counter = 0 /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 46885640..7fcc0405 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -20,20 +20,20 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource import kotlin.math.min /** * Helper class to expose an observable [speed] field describing the speed of the consumer. */ -public class SimSpeedConsumerAdapter( - private val delegate: SimResourceConsumer, +public class FlowSourceRateAdapter( + private val delegate: FlowSource, private val callback: (Double) -> Unit = {} -) : SimResourceConsumer by delegate { +) : FlowSource by delegate { /** * The resource processing speed at this instant. */ @@ -49,34 +49,34 @@ public class SimSpeedConsumerAdapter( callback(0.0) } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return delegate.onNext(ctx, now, delta) + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { val oldSpeed = speed - delegate.onEvent(ctx, event) + delegate.onEvent(conn, now, event) when (event) { - SimResourceEvent.Run -> speed = ctx.speed - SimResourceEvent.Capacity -> { + FlowEvent.Converge -> speed = conn.rate + FlowEvent.Capacity -> { // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might // need to update the current speed. if (oldSpeed == speed) { - speed = min(ctx.capacity, speed) + speed = min(conn.capacity, speed) } } - SimResourceEvent.Exit -> speed = 0.0 + FlowEvent.Exit -> speed = 0.0 else -> {} } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { speed = 0.0 - delegate.onFailure(ctx, cause) + delegate.onFailure(conn, cause) } - override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" + override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index 4c0e075c..4d3ae61a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -20,21 +20,20 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource /** - * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource - * consumption for some period of time. + * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time. */ -public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { +public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource { private var _iterator: Iterator<Fragment>? = null private var _nextTarget = Long.MIN_VALUE - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget if (nextTarget > now) { @@ -45,21 +44,21 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour return if (iterator.hasNext()) { val fragment = iterator.next() _nextTarget = now + fragment.duration - ctx.push(fragment.usage) + conn.push(fragment.usage) fragment.duration } else { - ctx.close() + conn.close() Long.MAX_VALUE } } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> { - check(_iterator == null) { "Consumer already running" } + FlowEvent.Start -> { + check(_iterator == null) { "Source already running" } _iterator = trace.iterator() } - SimResourceEvent.Exit -> { + FlowEvent.Exit -> { _iterator = null } else -> {} @@ -67,7 +66,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour } /** - * A fragment of the workload. + * A fragment of the tgrace. */ public data class Fragment(val duration: Long, val usage: Double) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt new file mode 100644 index 00000000..061ebea6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.* +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowConsumerContextImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource + +/** + * A test suite for the [FlowConsumerContextImpl] class. + */ +class FlowConsumerContextTest { + @Test + fun testFlushWithoutCommand() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + engine.scheduleSync(engine.clock.millis(), context) + } + + @Test + fun testIntermediateFlush() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = FixedFlowSource(1.0, 1.0) + + val logic = spyk(object : FlowConsumerLogic {}) + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 1.0 + + context.start() + delay(1) // Delay 1 ms to prevent hitting the fast path + engine.scheduleSync(engine.clock.millis(), context) + + verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) } + } + + @Test + fun testDoubleStart() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(0.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + assertThrows<IllegalStateException> { + context.start() + } + } + + @Test + fun testIdempotentCapacityChange() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + }) + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 4200.0 + context.start() + context.capacity = 4200.0 + + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testFailureNoInfiniteLoop() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) throw IllegalStateException("onEvent") + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + throw IllegalStateException("onFailure") + } + }) + + val logic = object : FlowConsumerLogic {} + + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + delay(1) + + verify(exactly = 1) { consumer.onFailure(any(), any()) } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 49e60f68..cbc48a4e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import io.mockk.spyk import io.mockk.verify @@ -29,24 +29,24 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource /** - * A test suite for the [SimResourceForwarder] class. + * A test suite for the [FlowForwarder] class. */ -internal class SimResourceForwarderTest { +internal class FlowForwarderTest { @Test fun testCancelImmediately() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) launch { source.consume(forwarder) } - forwarder.consume(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } }) @@ -57,22 +57,22 @@ internal class SimResourceForwarderTest { @Test fun testCancel() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) launch { source.consume(forwarder) } - forwarder.consume(object : SimResourceConsumer { + forwarder.consume(object : FlowSource { var isFirst = true - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) 10 * 1000 } else { - ctx.close() + conn.close() Long.MAX_VALUE } } @@ -84,10 +84,11 @@ internal class SimResourceForwarderTest { @Test fun testState() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } } @@ -108,11 +109,12 @@ internal class SimResourceForwarderTest { @Test fun testCancelPendingDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } }) @@ -120,16 +122,16 @@ internal class SimResourceForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testCancelStartedDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) - val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -137,17 +139,17 @@ internal class SimResourceForwarderTest { yield() forwarder.cancel() - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testCancelPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) - val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -155,19 +157,19 @@ internal class SimResourceForwarderTest { yield() source.cancel() - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testExitPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } } @@ -181,11 +183,11 @@ internal class SimResourceForwarderTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(1.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + val consumer = spyk(FixedFlowSource(2.0, 1.0)) source.startConsumer(forwarder) coroutineScope { @@ -195,16 +197,16 @@ internal class SimResourceForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } } @Test fun testCounters() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(1.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) - val consumer = SimWorkConsumer(2.0, 1.0) + val consumer = FixedFlowSource(2.0, 1.0) source.startConsumer(forwarder) forwarder.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index e055daf7..010a985e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import io.mockk.every import io.mockk.mockk @@ -30,24 +30,24 @@ import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter /** - * A test suite for the [SimResourceSource] class. + * A test suite for the [FlowSink] class. */ -internal class SimResourceSourceTest { +internal class FlowSinkTest { @Test fun testSpeed() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(4200.0, 1.0) + val consumer = FixedFlowSource(4200.0, 1.0) val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val adapter = FlowSourceRateAdapter(consumer, res::add) provider.consume(adapter) @@ -56,10 +56,10 @@ internal class SimResourceSourceTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val provider = SimResourceSource(1.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(engine, 1.0) - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + val consumer = spyk(FixedFlowSource(2.0, 1.0)) coroutineScope { launch { provider.consume(consumer) } @@ -67,19 +67,19 @@ internal class SimResourceSourceTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } } @Test fun testSpeedLimit() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(capacity, 2.0) + val consumer = FixedFlowSource(capacity, 2.0) val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val adapter = FlowSourceRateAdapter(consumer, res::add) provider.consume(adapter) @@ -87,23 +87,23 @@ internal class SimResourceSourceTest { } /** - * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or - * [SimResourceConsumer.onNext]. + * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or + * [FlowSource.onPull]. */ @Test fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - ctx.interrupt() + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + conn.pull() } } @@ -112,28 +112,28 @@ internal class SimResourceSourceTest { @Test fun testInterrupt() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) - lateinit var resCtx: SimResourceContext + val provider = FlowSink(engine, capacity) + lateinit var resCtx: FlowConnection - val consumer = object : SimResourceConsumer { + val consumer = object : FlowSource { var isFirst = true - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> resCtx = ctx + FlowEvent.Start -> resCtx = conn else -> {} } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) 4000 } else { - ctx.close() + conn.close() Long.MAX_VALUE } } @@ -141,7 +141,7 @@ internal class SimResourceSourceTest { launch { yield() - resCtx.interrupt() + resCtx.pull() } provider.consume(consumer) @@ -150,12 +150,12 @@ internal class SimResourceSourceTest { @Test fun testFailure() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } + val consumer = mockk<FlowSource>(relaxUnitFun = true) + every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) } .throws(IllegalStateException()) assertThrows<IllegalStateException> { @@ -165,17 +165,17 @@ internal class SimResourceSourceTest { @Test fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = object : SimResourceConsumer { + val consumer = object : FlowSource { var isFirst = true - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) 1000 } else { throw IllegalStateException() @@ -190,11 +190,11 @@ internal class SimResourceSourceTest { @Test fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(capacity, 1.0) + val consumer = FixedFlowSource(capacity, 1.0) assertThrows<IllegalStateException> { coroutineScope { @@ -206,11 +206,11 @@ internal class SimResourceSourceTest { @Test fun testCancelDuringConsumption() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(capacity, 1.0) + val consumer = FixedFlowSource(capacity, 1.0) launch { provider.consume(consumer) } delay(500) @@ -225,12 +225,12 @@ internal class SimResourceSourceTest { fun testInfiniteSleep() { assertThrows<IllegalStateException> { runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE } provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt index 49f2da5f..b503087e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.mux import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals @@ -28,13 +28,14 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource /** - * Test suite for the [SimResourceSwitchExclusive] class. + * Test suite for the [ForwardingFlowMultiplexer] class. */ internal class SimResourceSwitchExclusiveTest { /** @@ -42,29 +43,29 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTrace() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val speed = mutableListOf<Double>() val duration = 5 * 60L val workload = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) ), ) - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) - val forwarder = SimResourceForwarder() - val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) source.startConsumer(adapter) - switch.addInput(forwarder) + switch.addOutput(forwarder) - val provider = switch.newOutput() + val provider = switch.newInput() provider.consume(workload) yield() @@ -79,17 +80,17 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = SimWorkConsumer(duration * 3.2, 1.0) + val workload = FixedFlowSource(duration * 3.2, 1.0) - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) - switch.addInput(source) + switch.addOutput(source) - val provider = switch.newOutput() + val provider = switch.newInput() provider.consume(workload) yield() @@ -101,37 +102,37 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer { + val workload = object : FlowSource { var isFirst = true - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> isFirst = true + FlowEvent.Start -> isFirst = true else -> {} } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) duration } else { - ctx.close() + conn.close() Long.MAX_VALUE } } } - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) - switch.addInput(source) + switch.addOutput(source) - val provider = switch.newOutput() + val provider = switch.newInput() provider.consume(workload) yield() provider.consume(workload) @@ -143,14 +144,14 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) - switch.addInput(source) + switch.addOutput(source) - switch.newOutput() - assertThrows<IllegalStateException> { switch.newOutput() } + switch.newInput() + assertThrows<IllegalStateException> { switch.newInput() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt index 03f90e21..089a8d78 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.mux import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -28,24 +28,26 @@ import kotlinx.coroutines.yield import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.TraceFlowSource /** - * Test suite for the [SimResourceSwitch] implementations + * Test suite for the [FlowMultiplexer] implementations */ internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin(scheduler) + val scheduler = FlowEngineImpl(coroutineContext, clock) + val switch = MaxMinFlowMultiplexer(scheduler) - val sources = List(2) { SimResourceSource(2000.0, scheduler) } - sources.forEach { switch.addInput(it) } + val sources = List(2) { FlowSink(scheduler, 2000.0) } + sources.forEach { switch.addOutput(it) } - val provider = switch.newOutput() - val consumer = SimWorkConsumer(2000.0, 1.0) + val provider = switch.newInput() + val consumer = FixedFlowSource(2000.0, 1.0) try { provider.consume(consumer) @@ -60,24 +62,24 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val scheduler = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L val workload = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) ), ) - val switch = SimResourceSwitchMaxMin(scheduler) - val provider = switch.newOutput() + val switch = MaxMinFlowMultiplexer(scheduler) + val provider = switch.newInput() try { - switch.addInput(SimResourceSource(3200.0, scheduler)) + switch.addOutput(FlowSink(scheduler, 3200.0)) provider.consume(workload) yield() } finally { @@ -97,34 +99,34 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val scheduler = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L val workloadA = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) ), ) val workloadB = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3100.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 73.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3100.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 73.0) ) ) - val switch = SimResourceSwitchMaxMin(scheduler) - val providerA = switch.newOutput() - val providerB = switch.newOutput() + val switch = MaxMinFlowMultiplexer(scheduler) + val providerA = switch.newInput() + val providerB = switch.newInput() try { - switch.addInput(SimResourceSource(3200.0, scheduler)) + switch.addOutput(FlowSink(scheduler, 3200.0)) coroutineScope { launch { providerA.consume(workloadA) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt index 830f16d3..8396d346 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt @@ -20,24 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.source import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl /** - * A test suite for the [SimWorkConsumer] class. + * A test suite for the [FixedFlowSource] class. */ -internal class SimWorkConsumerTest { +internal class FixedFlowSourceTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val provider = SimResourceSource(1.0, scheduler) + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) - val consumer = SimWorkConsumer(1.0, 1.0) + val consumer = FixedFlowSource(1.0, 1.0) provider.consume(consumer) assertEquals(1000, clock.millis()) @@ -45,10 +46,10 @@ internal class SimWorkConsumerTest { @Test fun testUtilization() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val provider = SimResourceSource(1.0, scheduler) + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) - val consumer = SimWorkConsumer(1.0, 0.5) + val consumer = FixedFlowSource(1.0, 0.5) provider.consume(consumer) assertEquals(2000, clock.millis()) diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts index eb9adcd1..a8f94602 100644 --- a/opendc-simulator/opendc-simulator-network/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts @@ -30,6 +30,6 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcSimulator.opendcSimulatorResources) + api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt index 102e5625..4b66d5cf 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowSource /** * A network port allows network devices to be connected to network through links. @@ -78,14 +78,14 @@ public abstract class SimNetworkPort { } /** - * Create a [SimResourceConsumer] which generates the outgoing traffic of this port. + * Create a [FlowSource] which generates the outgoing traffic of this port. */ - protected abstract fun createConsumer(): SimResourceConsumer + protected abstract fun createConsumer(): FlowSource /** - * The [SimResourceProvider] which processes the ingoing traffic of this port. + * The [FlowConsumer] which processes the ingoing traffic of this port. */ - protected abstract val provider: SimResourceProvider + protected abstract val provider: FlowConsumer override fun toString(): String = "SimNetworkPort[isConnected=$isConnected]" } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index 7db0f176..4b0d7bbd 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -22,22 +22,22 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* /** * A network sink which discards all received traffic and does not generate any traffic itself. */ public class SimNetworkSink( - interpreter: SimResourceInterpreter, + engine: FlowEngine, public val capacity: Double ) : SimNetworkPort() { - override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun createConsumer(): FlowSource = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE override fun toString(): String = "SimNetworkSink.Consumer" } - override val provider: SimResourceProvider = SimResourceSource(capacity, interpreter) + override val provider: FlowConsumer = FlowSink(engine, capacity) override fun toString(): String = "SimNetworkSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt index 2267715e..2b7c1ad7 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt @@ -22,12 +22,13 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A [SimNetworkSwitch] that can support new networking ports on demand. */ -public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimNetworkSwitch { +public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetworkSwitch { /** * The ports of this switch. */ @@ -36,9 +37,9 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN private val _ports = mutableListOf<Port>() /** - * The [SimResourceSwitchMaxMin] to actually perform the switching. + * The [MaxMinFlowMultiplexer] to actually perform the switching. */ - private val switch = SimResourceSwitchMaxMin(interpreter) + private val mux = MaxMinFlowMultiplexer(engine) /** * Open a new port on the switch. @@ -58,19 +59,19 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN */ private var isClosed: Boolean = false - override val provider: SimResourceProvider + override val provider: FlowConsumer get() = _provider - private val _provider = switch.newOutput() + private val _provider = mux.newInput() - override fun createConsumer(): SimResourceConsumer { - val forwarder = SimResourceForwarder(isCoupled = true) - switch.addInput(forwarder) + override fun createConsumer(): FlowSource { + val forwarder = FlowForwarder(engine, isCoupled = true) + mux.addOutput(forwarder) return forwarder } override fun close() { isClosed = true - switch.removeOutput(_provider) + mux.removeInput(_provider) _ports.remove(this) } } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index b8c4b00d..45d0bcf0 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -31,8 +31,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimNetworkSink] class. @@ -40,8 +40,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimNetworkSinkTest { @Test fun testInitialState() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertFalse(sink.isConnected) assertNull(sink.link) @@ -50,8 +50,8 @@ class SimNetworkSinkTest { @Test fun testDisconnectIdempotent() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertDoesNotThrow { sink.disconnect() } assertFalse(sink.isConnected) @@ -59,8 +59,8 @@ class SimNetworkSinkTest { @Test fun testConnectCircular() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertThrows<IllegalArgumentException> { sink.connect(sink) @@ -69,8 +69,8 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnectedTarget() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) val source = mockk<SimNetworkPort>(relaxUnitFun = true) every { source.isConnected } returns true @@ -81,9 +81,9 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnected() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source1 = Source(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source1 = Source(engine) val source2 = mockk<SimNetworkPort>(relaxUnitFun = true) @@ -97,9 +97,9 @@ class SimNetworkSinkTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) val consumer = source.consumer sink.connect(source) @@ -108,14 +108,14 @@ class SimNetworkSinkTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), SimResourceEvent.Start) } + verify { consumer.onEvent(any(), any(), FlowEvent.Start) } } @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) val consumer = source.consumer sink.connect(source) @@ -124,14 +124,14 @@ class SimNetworkSinkTest { assertFalse(sink.isConnected) assertFalse(source.isConnected) - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } - private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() { - val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8)) + private class Source(engine: FlowEngine) : SimNetworkPort() { + val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8)) - public override fun createConsumer(): SimResourceConsumer = consumer + public override fun createConsumer(): FlowSource = consumer - override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter) + override val provider: FlowConsumer = FlowSink(engine, 0.0) } } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt index 3a749bfe..4aa2fa92 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt @@ -28,8 +28,8 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimNetworkSwitchVirtual] class. @@ -37,10 +37,10 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimNetworkSwitchVirtualTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) - val switch = SimNetworkSwitchVirtual(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) + val switch = SimNetworkSwitchVirtual(engine) val consumer = source.consumer switch.newPort().connect(sink) @@ -50,14 +50,14 @@ class SimNetworkSwitchVirtualTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), SimResourceEvent.Start) } + verify { consumer.onEvent(any(), any(), FlowEvent.Start) } } @Test fun testConnectClosedPort() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val switch = SimNetworkSwitchVirtual(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val switch = SimNetworkSwitchVirtual(engine) val port = switch.newPort() port.close() @@ -67,11 +67,11 @@ class SimNetworkSwitchVirtualTest { } } - private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() { - val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8)) + private class Source(engine: FlowEngine) : SimNetworkPort() { + val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8)) - public override fun createConsumer(): SimResourceConsumer = consumer + public override fun createConsumer(): FlowSource = consumer - override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter) + override val provider: FlowConsumer = FlowSink(engine, 0.0) } } diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts index f2a49964..e4342a6a 100644 --- a/opendc-simulator/opendc-simulator-power/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts @@ -30,6 +30,6 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcSimulator.opendcSimulatorResources) + api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index 1a12a52a..c33f5186 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -22,46 +22,48 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A model of a Power Distribution Unit (PDU). * - * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood. + * @param engine The underlying [FlowEngine] to drive the simulation under the hood. * @param idlePower The idle power consumption of the PDU independent of the load on the PDU. * @param lossCoefficient The coefficient for the power loss of the PDU proportional to the square load. */ public class SimPdu( - interpreter: SimResourceInterpreter, + engine: FlowEngine, private val idlePower: Double = 0.0, private val lossCoefficient: Double = 0.0, ) : SimPowerInlet() { /** - * The [SimResourceSwitch] that distributes the electricity over the PDU outlets. + * The [FlowMultiplexer] that distributes the electricity over the PDU outlets. */ - private val switch = SimResourceSwitchMaxMin(interpreter) + private val mux = MaxMinFlowMultiplexer(engine) /** - * The [SimResourceForwarder] that represents the input of the PDU. + * The [FlowForwarder] that represents the input of the PDU. */ - private val forwarder = SimResourceForwarder() + private val forwarder = FlowForwarder(engine) /** * Create a new PDU outlet. */ - public fun newOutlet(): Outlet = Outlet(switch, switch.newOutput()) + public fun newOutlet(): Outlet = Outlet(mux, mux.newInput()) init { - switch.addInput(forwarder) + mux.addOutput(forwarder) } - override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by forwarder { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val duration = forwarder.onNext(ctx, now, delta) - val loss = computePowerLoss(ctx.demand) - val newLimit = ctx.demand + loss + override fun createConsumer(): FlowSource = object : FlowSource by forwarder { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val duration = forwarder.onPull(conn, now, delta) + val loss = computePowerLoss(conn.demand) + val newLimit = conn.demand + loss - ctx.push(newLimit) + conn.push(newLimit) return duration } @@ -81,7 +83,7 @@ public class SimPdu( /** * A PDU outlet. */ - public class Outlet(private val switch: SimResourceSwitch, private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable { + public class Outlet(private val switch: FlowMultiplexer, private val provider: FlowConsumer) : SimPowerOutlet(), AutoCloseable { override fun onConnect(inlet: SimPowerInlet) { provider.startConsumer(inlet.createConsumer()) } @@ -94,7 +96,7 @@ public class SimPdu( * Remove the outlet from the PDU. */ override fun close() { - switch.removeOutput(provider) + switch.removeInput(provider) } override fun toString(): String = "SimPdu.Outlet" diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt index 0ac1f199..851b28a5 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.flow.FlowSource /** * An abstract inlet that consumes electricity from a power outlet. @@ -42,7 +42,7 @@ public abstract class SimPowerInlet { internal var _outlet: SimPowerOutlet? = null /** - * Create a [SimResourceConsumer] which represents the consumption of electricity from the power outlet. + * Create a [FlowSource] which represents the consumption of electricity from the power outlet. */ - public abstract fun createConsumer(): SimResourceConsumer + public abstract fun createConsumer(): FlowSource } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt index 3ef8ccc6..7faebd75 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt @@ -22,25 +22,25 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSource +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSink /** * A [SimPowerOutlet] that represents a source of electricity. * - * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood. + * @param engine The underlying [FlowEngine] to drive the simulation under the hood. */ -public class SimPowerSource(interpreter: SimResourceInterpreter, public val capacity: Double) : SimPowerOutlet() { +public class SimPowerSource(engine: FlowEngine, public val capacity: Double) : SimPowerOutlet() { /** * The resource source that drives this power source. */ - private val source = SimResourceSource(capacity, interpreter) + private val source = FlowSink(engine, capacity) /** * The power draw at this instant. */ public val powerDraw: Double - get() = source.speed + get() = source.rate override fun onConnect(inlet: SimPowerInlet) { source.startConsumer(inlet.createConsumer()) diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt index 9c7400ed..5eaa91af 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt @@ -22,50 +22,51 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A model of an Uninterruptible Power Supply (UPS). * * This model aggregates multiple power sources into a single source in order to ensure that power is always available. * - * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood. + * @param engine The underlying [FlowEngine] to drive the simulation under the hood. * @param idlePower The idle power consumption of the UPS independent of the load. * @param lossCoefficient The coefficient for the power loss of the UPS proportional to the load. */ public class SimUps( - interpreter: SimResourceInterpreter, + private val engine: FlowEngine, private val idlePower: Double = 0.0, private val lossCoefficient: Double = 0.0, ) : SimPowerOutlet() { /** * The resource aggregator used to combine the input sources. */ - private val switch = SimResourceSwitchMaxMin(interpreter) + private val switch = MaxMinFlowMultiplexer(engine) /** - * The [SimResourceProvider] that represents the output of the UPS. + * The [FlowConsumer] that represents the output of the UPS. */ - private val provider = switch.newOutput() + private val provider = switch.newInput() /** * Create a new UPS outlet. */ public fun newInlet(): SimPowerInlet { - val forward = SimResourceForwarder(isCoupled = true) - switch.addInput(forward) + val forward = FlowForwarder(engine, isCoupled = true) + switch.addOutput(forward) return Inlet(forward) } override fun onConnect(inlet: SimPowerInlet) { val consumer = inlet.createConsumer() - provider.startConsumer(object : SimResourceConsumer by consumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val duration = consumer.onNext(ctx, now, delta) - val loss = computePowerLoss(ctx.demand) - val newLimit = ctx.demand + loss + provider.startConsumer(object : FlowSource by consumer { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val duration = consumer.onPull(conn, now, delta) + val loss = computePowerLoss(conn.demand) + val newLimit = conn.demand + loss - ctx.push(newLimit) + conn.push(newLimit) return duration } }) @@ -86,8 +87,8 @@ public class SimUps( /** * A UPS inlet. */ - public inner class Inlet(private val forwarder: SimResourceForwarder) : SimPowerInlet(), AutoCloseable { - override fun createConsumer(): SimResourceConsumer = forwarder + public inner class Inlet(private val forwarder: FlowForwarder) : SimPowerInlet(), AutoCloseable { + override fun createConsumer(): FlowSource = forwarder /** * Remove the inlet from the PSU. diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt index 17a174b7..568a1e8c 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt @@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceEvent -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimPdu] class. @@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimPduTest { @Test fun testZeroOutlets() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) assertEquals(0.0, source.powerDraw) @@ -49,9 +49,9 @@ internal class SimPduTest { @Test fun testSingleOutlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) pdu.newOutlet().connect(SimpleInlet()) @@ -60,9 +60,9 @@ internal class SimPduTest { @Test fun testDoubleOutlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) pdu.newOutlet().connect(SimpleInlet()) @@ -73,28 +73,28 @@ internal class SimPduTest { @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) - val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0)) + val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0)) val inlet = object : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = consumer + override fun createConsumer(): FlowSource = consumer } val outlet = pdu.newOutlet() outlet.connect(inlet) outlet.disconnect() - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testLoss() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) // https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN - val pdu = SimPdu(interpreter, idlePower = 1.5, lossCoefficient = 0.015) + val pdu = SimPdu(engine, idlePower = 1.5, lossCoefficient = 0.015) source.connect(pdu) pdu.newOutlet().connect(SimpleInlet()) assertEquals(89.0, source.powerDraw, 0.01) @@ -102,9 +102,9 @@ internal class SimPduTest { @Test fun testOutletClose() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) val outlet = pdu.newOutlet() outlet.close() @@ -115,6 +115,6 @@ internal class SimPduTest { } class SimpleInlet : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5) + override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5) } } diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt index f3829ba1..b411e292 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt @@ -31,10 +31,10 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceEvent -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimPowerSource] @@ -42,8 +42,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimPowerSourceTest { @Test fun testInitialState() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) assertFalse(source.isConnected) assertNull(source.inlet) @@ -52,8 +52,8 @@ internal class SimPowerSourceTest { @Test fun testDisconnectIdempotent() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) assertDoesNotThrow { source.disconnect() } assertFalse(source.isConnected) @@ -61,8 +61,8 @@ internal class SimPowerSourceTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = SimpleInlet() source.connect(inlet) @@ -76,27 +76,27 @@ internal class SimPowerSourceTest { @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0)) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0)) val inlet = object : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = consumer + override fun createConsumer(): FlowSource = consumer } source.connect(inlet) source.disconnect() - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testDisconnectAssertion() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = mockk<SimPowerInlet>(relaxUnitFun = true) every { inlet.isConnected } returns false every { inlet._outlet } returns null - every { inlet.createConsumer() } returns SimWorkConsumer(100.0, utilization = 1.0) + every { inlet.createConsumer() } returns FixedFlowSource(100.0, utilization = 1.0) source.connect(inlet) @@ -107,8 +107,8 @@ internal class SimPowerSourceTest { @Test fun testOutletAlreadyConnected() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = SimpleInlet() source.connect(inlet) @@ -121,8 +121,8 @@ internal class SimPowerSourceTest { @Test fun testInletAlreadyConnected() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = mockk<SimPowerInlet>(relaxUnitFun = true) every { inlet.isConnected } returns true @@ -132,6 +132,6 @@ internal class SimPowerSourceTest { } class SimpleInlet : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 1.0) + override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 1.0) } } diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt index 8d5fa857..31ac0b39 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt @@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceEvent -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimUps] class. @@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimUpsTest { @Test fun testSingleInlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val ups = SimUps(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val ups = SimUps(engine) source.connect(ups.newInlet()) ups.connect(SimpleInlet()) @@ -50,10 +50,10 @@ internal class SimUpsTest { @Test fun testDoubleInlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source1 = SimPowerSource(interpreter, capacity = 100.0) - val source2 = SimPowerSource(interpreter, capacity = 100.0) - val ups = SimUps(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source1 = SimPowerSource(engine, capacity = 100.0) + val source2 = SimPowerSource(engine, capacity = 100.0) + val ups = SimUps(engine) source1.connect(ups.newInlet()) source2.connect(ups.newInlet()) @@ -67,10 +67,10 @@ internal class SimUpsTest { @Test fun testLoss() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) // https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN - val ups = SimUps(interpreter, idlePower = 4.0, lossCoefficient = 0.05) + val ups = SimUps(engine, idlePower = 4.0, lossCoefficient = 0.05) source.connect(ups.newInlet()) ups.connect(SimpleInlet()) @@ -79,24 +79,24 @@ internal class SimUpsTest { @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source1 = SimPowerSource(interpreter, capacity = 100.0) - val source2 = SimPowerSource(interpreter, capacity = 100.0) - val ups = SimUps(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source1 = SimPowerSource(engine, capacity = 100.0) + val source2 = SimPowerSource(engine, capacity = 100.0) + val ups = SimUps(engine) source1.connect(ups.newInlet()) source2.connect(ups.newInlet()) - val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0)) + val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0)) val inlet = object : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = consumer + override fun createConsumer(): FlowSource = consumer } ups.connect(inlet) ups.disconnect() - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } class SimpleInlet : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5) + override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt deleted file mode 100644 index b406b896..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A controllable [SimResourceContext]. - * - * This interface is used by resource providers to control the resource context. - */ -public interface SimResourceControllableContext : SimResourceContext { - /** - * The capacity of the resource. - */ - public override var capacity: Double - - /** - * Start the resource context. - */ - public fun start() - - /** - * Invalidate the resource context's state. - * - * By invalidating the resource context's current state, the state is re-computed and the current progress is - * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the - * synchronous variant. - */ - public fun invalidate() - - /** - * Synchronously flush the progress of the resource context and materialize its current progress. - */ - public fun flush() -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt deleted file mode 100644 index f1e004d2..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.interference.InterferenceKey -import java.util.ArrayDeque - -/** - * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that - * a single output is directly connected to an input and that the switch can only support as many outputs as inputs. - */ -public class SimResourceSwitchExclusive : SimResourceSwitch { - override val outputs: Set<SimResourceProvider> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - - private val _inputs = mutableSetOf<SimResourceProvider>() - override val inputs: Set<SimResourceProvider> - get() = _inputs - private val _availableInputs = ArrayDeque<SimResourceForwarder>() - - override val counters: SimResourceCounters = object : SimResourceCounters { - override val demand: Double - get() = _inputs.sumOf { it.counters.demand } - override val actual: Double - get() = _inputs.sumOf { it.counters.actual } - override val overcommit: Double - get() = _inputs.sumOf { it.counters.overcommit } - override val interference: Double - get() = _inputs.sumOf { it.counters.interference } - - override fun reset() { - for (input in _inputs) { - input.counters.reset() - } - } - - override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" - } - - /** - * Add an output to the switch. - */ - override fun newOutput(key: InterferenceKey?): SimResourceProvider { - val forwarder = checkNotNull(_availableInputs.poll()) { "No capacity to serve request" } - val output = Output(forwarder) - _outputs += output - return output - } - - override fun removeOutput(output: SimResourceProvider) { - if (!_outputs.remove(output)) { - return - } - - (output as Output).close() - } - - /** - * Add an input to the switch. - */ - override fun addInput(input: SimResourceProvider) { - if (input in inputs) { - return - } - - val forwarder = SimResourceForwarder() - - _inputs += input - _availableInputs += forwarder - - input.startConsumer(object : SimResourceConsumer by forwarder { - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - if (event == SimResourceEvent.Exit) { - // De-register the input after it has finished - _inputs -= input - } - - forwarder.onEvent(ctx, event) - } - }) - } - - override fun clear() { - for (input in _inputs) { - input.cancel() - } - _inputs.clear() - - // Outputs are implicitly cancelled by the inputs forwarders - _outputs.clear() - } - - /** - * An output of the resource switch. - */ - private inner class Output(private val forwarder: SimResourceForwarder) : SimResourceProvider by forwarder { - /** - * Close the output. - */ - fun close() { - // We explicitly do not close the forwarder here in order to re-use it across output resources. - _outputs -= this - _availableInputs += forwarder - } - - override fun toString(): String = "SimResourceSwitchExclusive.Output" - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt deleted file mode 100644 index 574fb443..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.impl.SimResourceCountersImpl -import org.opendc.simulator.resources.interference.InterferenceDomain -import org.opendc.simulator.resources.interference.InterferenceKey -import kotlin.math.max -import kotlin.math.min - -/** - * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min - * fair sharing. - * - * @param interpreter The interpreter for managing the resource contexts. - * @param parent The parent resource system of the switch. - * @param interferenceDomain The interference domain of the switch. - */ -public class SimResourceSwitchMaxMin( - private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, - private val interferenceDomain: InterferenceDomain? = null -) : SimResourceSwitch { - /** - * The output resource providers to which resource consumers can be attached. - */ - override val outputs: Set<SimResourceProvider> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - private val _activeOutputs: MutableList<Output> = mutableListOf() - - /** - * The input resources that will be switched between the output providers. - */ - override val inputs: Set<SimResourceProvider> - get() = _inputs - private val _inputs = mutableSetOf<SimResourceProvider>() - private val _activeInputs = mutableListOf<Input>() - - /** - * The resource counters of this switch. - */ - public override val counters: SimResourceCounters - get() = _counters - private val _counters = SimResourceCountersImpl() - - /** - * The actual processing rate of the switch. - */ - private var _rate = 0.0 - - /** - * The demanded processing rate of the outputs. - */ - private var _demand = 0.0 - - /** - * The capacity of the switch. - */ - private var _capacity = 0.0 - - /** - * Flag to indicate that the scheduler is active. - */ - private var _schedulerActive = false - - /** - * Add an output to the switch. - */ - override fun newOutput(key: InterferenceKey?): SimResourceProvider { - val provider = Output(_capacity, key) - _outputs.add(provider) - return provider - } - - /** - * Add the specified [input] to the switch. - */ - override fun addInput(input: SimResourceProvider) { - val consumer = Input(input) - if (_inputs.add(input)) { - _activeInputs.add(consumer) - input.startConsumer(consumer) - } - } - - /** - * Remove [output] from this switch. - */ - override fun removeOutput(output: SimResourceProvider) { - if (!_outputs.remove(output)) { - return - } - // This cast should always succeed since only `Output` instances should be added to _outputs - (output as Output).close() - } - - override fun clear() { - for (input in _activeInputs) { - input.cancel() - } - _activeInputs.clear() - - for (output in _activeOutputs) { - output.cancel() - } - _activeOutputs.clear() - } - - /** - * Run the scheduler of the switch. - */ - private fun runScheduler(now: Long) { - if (_schedulerActive) { - return - } - - _schedulerActive = true - try { - doSchedule(now) - } finally { - _schedulerActive = false - } - } - - /** - * Schedule the outputs over the input. - */ - private fun doSchedule(now: Long) { - // If there is no work yet, mark the input as idle. - if (_activeOutputs.isEmpty()) { - return - } - - val capacity = _capacity - var availableCapacity = capacity - - // Pull in the work of the outputs - val outputIterator = _activeOutputs.listIterator() - for (output in outputIterator) { - output.pull(now) - - // Remove outputs that have finished - if (!output.isActive) { - outputIterator.remove() - } - } - - var demand = 0.0 - - // Sort in-place the outputs based on their requested usage. - // Profiling shows that it is faster than maintaining some kind of sorted set. - _activeOutputs.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - var remaining = _activeOutputs.size - for (output in _activeOutputs) { - val availableShare = availableCapacity / remaining-- - val grantedSpeed = min(output.allowedRate, availableShare) - - // Ignore idle computation - if (grantedSpeed <= 0.0) { - output.actualRate = 0.0 - continue - } - - demand += output.limit - - output.actualRate = grantedSpeed - availableCapacity -= grantedSpeed - } - - val rate = capacity - availableCapacity - - _demand = demand - _rate = rate - - // Sort all consumers by their capacity - _activeInputs.sort() - - // Divide the requests over the available capacity of the input resources fairly - for (input in _activeInputs) { - val inputCapacity = input.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction - - input.push(grantedSpeed) - } - } - - /** - * Recompute the capacity of the switch. - */ - private fun updateCapacity() { - val newCapacity = _activeInputs.sumOf(Input::capacity) - - // No-op if the capacity is unchanged - if (_capacity == newCapacity) { - return - } - - _capacity = newCapacity - - for (output in _outputs) { - output.capacity = newCapacity - } - } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class Output(capacity: Double, val key: InterferenceKey?) : - SimAbstractResourceProvider(interpreter, capacity), - SimResourceProviderLogic, - Comparable<Output> { - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - - /** - * The actual processing speed. - */ - @JvmField var actualRate: Double = 0.0 - - /** - * The processing speed that is allowed by the model constraints. - */ - val allowedRate: Double - get() = min(capacity, limit) - - /** - * A flag to indicate that the output is closed. - */ - private var _isClosed: Boolean = false - - /** - * The timestamp at which we received the last command. - */ - private var _lastPull: Long = Long.MIN_VALUE - - /** - * Close the output. - * - * This method is invoked when the user removes an output from the switch. - */ - fun close() { - _isClosed = true - cancel() - } - - /* SimAbstractResourceProvider */ - override fun createLogic(): SimResourceProviderLogic = this - - override fun start(ctx: SimResourceControllableContext) { - check(!_isClosed) { "Cannot re-use closed output" } - - _activeOutputs += this - super.start(ctx) - } - - /* SimResourceProviderLogic */ - override fun onConsume( - ctx: SimResourceControllableContext, - now: Long, - delta: Long, - limit: Double, - duration: Long - ) { - doUpdateCounters(delta) - - actualRate = 0.0 - this.limit = limit - _lastPull = now - - runScheduler(now) - } - - override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { - parent?.onConverge(now) - } - - override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { - doUpdateCounters(delta) - - limit = 0.0 - actualRate = 0.0 - _lastPull = now - } - - /* Comparable */ - override fun compareTo(other: Output): Int = allowedRate.compareTo(other.allowedRate) - - /** - * Pull the next command if necessary. - */ - fun pull(now: Long) { - val ctx = ctx - if (ctx != null && _lastPull < now) { - ctx.flush() - } - } - - /** - * Helper method to update the resource counters of the distributor. - */ - private fun doUpdateCounters(delta: Long) { - if (delta <= 0L) { - return - } - - // Compute the performance penalty due to resource interference - val perfScore = if (interferenceDomain != null) { - val load = _rate / capacity - interferenceDomain.apply(key, load) - } else { - 1.0 - } - - val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualRate * deltaS - val remainingWork = work - actualWork - - updateCounters(work, actualWork, remainingWork) - - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) - } - } - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private inner class Input(private val provider: SimResourceProvider) : SimResourceConsumer, Comparable<Input> { - /** - * The active [SimResourceContext] of this consumer. - */ - private var _ctx: SimResourceContext? = null - - /** - * The capacity of this input. - */ - val capacity: Double - get() = _ctx?.capacity ?: 0.0 - - /** - * Push the specified rate to the provider. - */ - fun push(rate: Double) { - _ctx?.push(rate) - } - - /** - * Cancel this input. - */ - fun cancel() { - provider.cancel() - } - - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - runScheduler(now) - return Long.MAX_VALUE - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - assert(_ctx == null) { "Consumer running concurrently" } - _ctx = ctx - updateCapacity() - } - SimResourceEvent.Exit -> { - _ctx = null - updateCapacity() - } - SimResourceEvent.Capacity -> updateCapacity() - else -> {} - } - } - - override fun compareTo(other: Input): Int = capacity.compareTo(other.capacity) - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt deleted file mode 100644 index 1428ce42..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.* -import kotlinx.coroutines.* -import org.junit.jupiter.api.* -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceContextImpl -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl - -/** - * A test suite for the [SimResourceContextImpl] class. - */ -class SimResourceContextTest { - @Test - fun testFlushWithoutCommand() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(1.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - } - - val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(interpreter, consumer, logic) - - interpreter.scheduleSync(interpreter.clock.millis(), context) - } - - @Test - fun testIntermediateFlush() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = SimWorkConsumer(1.0, 1.0) - - val logic = spyk(object : SimResourceProviderLogic {}) - val context = SimResourceContextImpl(interpreter, consumer, logic) - context.capacity = 1.0 - - context.start() - delay(1) // Delay 1 ms to prevent hitting the fast path - interpreter.scheduleSync(interpreter.clock.millis(), context) - - verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } - } - - @Test - fun testIntermediateFlushIdle() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = SimWorkConsumer(1.0, 1.0) - - val logic = spyk(object : SimResourceProviderLogic {}) - val context = SimResourceContextImpl(interpreter, consumer, logic) - context.capacity = 1.0 - - context.start() - delay(500) - context.invalidate() - delay(500) - context.invalidate() - - assertAll( - { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } }, - { verify(exactly = 1) { logic.onFinish(any(), any(), any()) } } - ) - } - - @Test - fun testDoubleStart() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(0.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - } - - val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(interpreter, consumer, logic) - - context.start() - - assertThrows<IllegalStateException> { - context.start() - } - } - - @Test - fun testIdempotentCapacityChange() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(1.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - }) - - val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(interpreter, consumer, logic) - context.capacity = 4200.0 - context.start() - context.capacity = 4200.0 - - verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) } - } - - @Test - fun testFailureNoInfiniteLoop() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() - return Long.MAX_VALUE - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - if (event == SimResourceEvent.Exit) throw IllegalStateException("onEvent") - } - - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { - throw IllegalStateException("onFailure") - } - }) - - val logic = object : SimResourceProviderLogic {} - - val context = SimResourceContextImpl(interpreter, consumer, logic) - - context.start() - - delay(1) - - verify(exactly = 1) { consumer.onFailure(any(), any()) } - } -} |
