diff options
94 files changed, 1759 insertions, 1813 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index a8b8afe9..f499927d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -47,7 +47,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.roundToLong @@ -61,7 +61,7 @@ public class SimHost( model: MachineModel, override val meta: Map<String, Any>, context: CoroutineContext, - interpreter: SimResourceInterpreter, + engine: FlowEngine, meterProvider: MeterProvider, hypervisor: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), @@ -78,7 +78,7 @@ public class SimHost( /** * The clock instance used by the host. */ - private val clock = interpreter.clock + private val clock = engine.clock /** * The logger instance of this server. @@ -98,13 +98,13 @@ public class SimHost( /** * The machine to run on. */ - public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model.optimize(), powerDriver) + public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver) /** * The hypervisor to run multiple workloads. */ private val hypervisor: SimHypervisor = hypervisor.create( - interpreter, + engine, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain, listener = object : SimHypervisor.Listener { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 7f33154a..eda76ba0 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -198,7 +198,7 @@ internal class Guest( } /** - * Run the process that models the virtual machine lifecycle as a coroutine. + * Converge the process that models the virtual machine lifecycle as a coroutine. */ private suspend fun runMachine(workload: SimWorkload) { delay(1) // TODO Introduce model for boot time diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt index 6919b7fd..7d46e626 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt @@ -75,7 +75,7 @@ internal class HostFaultInjectorImpl( } /** - * Run the injection process. + * Converge the injection process. */ private suspend fun runInjector() { while (true) { diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index e75c31a0..d2293be7 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -41,7 +41,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.HOST_ID import org.opendc.telemetry.compute.table.HostData @@ -87,14 +87,14 @@ internal class SimHostTest { .setClock(clock.toOtelClock()) .build() - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val virtDriver = SimHost( uid = hostId, name = "test", model = machineModel, meta = emptyMap(), coroutineContext, - interpreter, + engine, meterProvider, SimFairShareHypervisorProvider() ) @@ -199,14 +199,14 @@ internal class SimHostTest { .setClock(clock.toOtelClock()) .build() - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val host = SimHost( uid = hostId, name = "test", model = machineModel, meta = emptyMap(), coroutineContext, - interpreter, + engine, meterProvider, SimFairShareHypervisorProvider() ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt index ed45bd8a..283f82fe 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt @@ -36,7 +36,7 @@ import org.opendc.compute.simulator.SimHost import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.compute.* import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock @@ -73,9 +73,9 @@ public class ComputeWorkloadRunner( private val _metricProducers = mutableListOf<MetricProducer>() /** - * The [SimResourceInterpreter] to simulate the hosts. + * The [FlowEngine] to simulate the hosts. */ - private val interpreter = SimResourceInterpreter(context, clock) + private val engine = FlowEngine(context, clock) /** * The hosts that belong to this class. @@ -89,7 +89,7 @@ public class ComputeWorkloadRunner( } /** - * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. */ public suspend fun run(trace: List<VirtualMachine>, seed: Long) { val random = Random(seed) @@ -178,7 +178,7 @@ public class ComputeWorkloadRunner( spec.model, spec.meta, context, - interpreter, + engine, meterProvider, spec.hypervisor, powerDriver = spec.powerDriver, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 21ff3ab0..4e855f82 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -131,7 +131,7 @@ abstract class Portfolio(name: String) : Experiment(name) { // Instantiate the desired topology runner.apply(topology) - // Run the workload trace + // Converge the workload trace runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { runner.close() diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 1fbd3b6a..2ba65e90 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -35,7 +35,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* import java.time.Clock import java.util.* import kotlin.coroutines.Continuation @@ -65,7 +65,7 @@ public class SimTFDevice( * The [SimMachine] representing the device. */ private val machine = SimBareMetalMachine( - SimResourceInterpreter(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)), + FlowEngine(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)), SimplePowerDriver(powerModel) ) @@ -95,11 +95,11 @@ public class SimTFDevice( /** * The workload that will be run by the device. */ - private val workload = object : SimWorkload, SimResourceConsumer { + private val workload = object : SimWorkload, FlowSource { /** * The resource context to interrupt the workload with. */ - var ctx: SimResourceContext? = null + var ctx: FlowConnection? = null /** * The capacity of the device. @@ -128,16 +128,16 @@ public class SimTFDevice( } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val consumedWork = ctx.speed * delta / 1000.0 + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumedWork = conn.rate * delta / 1000.0 val activeWork = activeWork if (activeWork != null) { if (activeWork.consume(consumedWork)) { this.activeWork = null } else { - val duration = (activeWork.flops / ctx.capacity * 1000).roundToLong() - ctx.push(ctx.capacity) + val duration = (activeWork.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) return duration } } @@ -146,27 +146,27 @@ public class SimTFDevice( val head = queue.poll() return if (head != null) { this.activeWork = head - val duration = (head.flops / ctx.capacity * 1000).roundToLong() - ctx.push(ctx.capacity) + val duration = (head.flops / conn.capacity * 1000).roundToLong() + conn.push(conn.capacity) duration } else { - ctx.push(0.0) + conn.push(0.0) Long.MAX_VALUE } } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - this.capacity = ctx.capacity + FlowEvent.Start -> { + this.ctx = conn + this.capacity = conn.capacity } - SimResourceEvent.Capacity -> { - this.capacity = ctx.capacity - ctx.interrupt() + FlowEvent.Capacity -> { + this.capacity = conn.capacity + conn.pull() } - SimResourceEvent.Run -> { - _usage.record(ctx.speed) + FlowEvent.Converge -> { + _usage.record(conn.rate) _power.record(machine.psu.powerDraw) } else -> {} @@ -188,7 +188,7 @@ public class SimTFDevice( override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont -> workload.queue.add(Work(flops, cont)) if (workload.isIdle) { - workload.ctx?.interrupt() + workload.ctx?.pull() } } diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt index 5839c0df..3e755b56 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt @@ -27,7 +27,7 @@ package org.opendc.experiments.tf20.distribute */ public interface Strategy { /** - * Run the specified batch using the given strategy. + * Converge the specified batch using the given strategy. */ public suspend fun run(forward: Double, backward: Double, batchSize: Int) } diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 68bdc337..020d75b5 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -36,7 +36,7 @@ import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import java.time.Clock import java.util.ArrayDeque import kotlin.coroutines.Continuation @@ -74,7 +74,7 @@ public class SimFunctionDeployer( * The machine that will execute the workloads. */ public val machine: SimMachine = SimBareMetalMachine( - SimResourceInterpreter(scope.coroutineContext, clock), + FlowEngine(scope.coroutineContext, clock), model, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index 7d06ee62..e2290a14 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -31,7 +31,7 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcSimulator.opendcSimulatorResources) + api(projects.opendcSimulator.opendcSimulatorFlow) api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 88ad7286..c57919c1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -36,7 +36,7 @@ import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import org.openjdk.jmh.annotations.* import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -48,13 +48,13 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var interpreter: SimResourceInterpreter + private lateinit var engine: FlowEngine private lateinit var machineModel: MachineModel @Setup fun setUp() { scope = SimulationCoroutineScope() - interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) + engine = FlowEngine(scope.coroutineContext, scope.clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -80,7 +80,7 @@ class SimMachineBenchmarks { fun benchmarkBareMetal(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace)) } @@ -90,9 +90,9 @@ class SimMachineBenchmarks { fun benchmarkSpaceSharedHypervisor(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } @@ -111,9 +111,9 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorSingle(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(interpreter) + val hypervisor = SimFairShareHypervisor(engine) launch { machine.run(hypervisor) } @@ -132,9 +132,9 @@ class SimMachineBenchmarks { fun benchmarkFairShareHypervisorDouble(state: Workload) { return scope.runBlockingSimulation { val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(interpreter) + val hypervisor = SimFairShareHypervisor(engine) launch { machine.run(hypervisor) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index f9db048d..6a62d8a5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -30,22 +30,22 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.NetworkAdapter import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* import kotlin.coroutines.Continuation import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. * - * @param interpreter The interpreter to manage the machine's resources. + * @param engine The engine to manage the machine's resources. * @param parent The parent simulation system. * @param model The model of the machine. */ public abstract class SimAbstractMachine( - protected val interpreter: SimResourceInterpreter, - final override val parent: SimResourceSystem?, + protected val engine: FlowEngine, + final override val parent: FlowSystem?, final override val model: MachineModel -) : SimMachine, SimResourceSystem { +) : SimMachine, FlowSystem { /** * The resources allocated for this machine. */ @@ -54,17 +54,17 @@ public abstract class SimAbstractMachine( /** * The memory interface of the machine. */ - public val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory) + public val memory: SimMemory = Memory(FlowSink(engine, model.memory.sumOf { it.size }.toDouble()), model.memory) /** * The network interfaces available to the machine. */ - public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) } + public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(engine, adapter, i) } /** * The network interfaces available to the machine. */ - public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) } + public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(engine, device, i) } /** * The peripherals of the machine. @@ -82,7 +82,7 @@ public abstract class SimAbstractMachine( private var cont: Continuation<Unit>? = null /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) { check(!isTerminated) { "Machine is terminated" } @@ -96,14 +96,14 @@ public abstract class SimAbstractMachine( // Cancel all cpus on cancellation cont.invokeOnCancellation { this.cont = null - interpreter.batch { + engine.batch { for (cpu in cpus) { cpu.cancel() } } } - interpreter.batch { workload.onStart(ctx) } + engine.batch { workload.onStart(ctx) } } } @@ -120,7 +120,7 @@ public abstract class SimAbstractMachine( * Cancel the workload that is currently running on the machine. */ private fun cancel() { - interpreter.batch { + engine.batch { for (cpu in cpus) { cpu.cancel() } @@ -137,8 +137,8 @@ public abstract class SimAbstractMachine( * The execution context in which the workload runs. */ private inner class Context(override val meta: Map<String, Any>) : SimMachineContext { - override val interpreter: SimResourceInterpreter - get() = this@SimAbstractMachine.interpreter + override val engine: FlowEngine + get() = this@SimAbstractMachine.engine override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus @@ -154,7 +154,7 @@ public abstract class SimAbstractMachine( /** * The [SimMemory] implementation for a machine. */ - private class Memory(source: SimResourceSource, override val models: List<MemoryUnit>) : SimMemory, SimResourceProvider by source { + private class Memory(source: FlowSink, override val models: List<MemoryUnit>) : SimMemory, FlowConsumer by source { override fun toString(): String = "SimAbstractMachine.Memory" } @@ -162,6 +162,7 @@ public abstract class SimAbstractMachine( * The [SimNetworkAdapter] implementation for a machine. */ private class NetworkAdapterImpl( + private val engine: FlowEngine, model: NetworkAdapter, index: Int ) : SimNetworkAdapter(), SimNetworkInterface { @@ -169,18 +170,18 @@ public abstract class SimAbstractMachine( override val bandwidth: Double = model.bandwidth - override val provider: SimResourceProvider + override val provider: FlowConsumer get() = _rx - override fun createConsumer(): SimResourceConsumer = _tx + override fun createConsumer(): FlowSource = _tx - override val tx: SimResourceProvider + override val tx: FlowConsumer get() = _tx - private val _tx = SimResourceForwarder() + private val _tx = FlowForwarder(engine) - override val rx: SimResourceConsumer + override val rx: FlowSource get() = _rx - private val _rx = SimResourceForwarder() + private val _rx = FlowForwarder(engine) override fun toString(): String = "SimAbstractMachine.NetworkAdapterImpl[name=$name,bandwidth=$bandwidth]" } @@ -189,7 +190,7 @@ public abstract class SimAbstractMachine( * The [SimStorageInterface] implementation for a machine. */ private class StorageDeviceImpl( - interpreter: SimResourceInterpreter, + engine: FlowEngine, model: StorageDevice, index: Int ) : SimStorageInterface { @@ -197,9 +198,9 @@ public abstract class SimAbstractMachine( override val capacity: Double = model.capacity - override val read: SimResourceProvider = SimResourceSource(model.readBandwidth, interpreter) + override val read: FlowConsumer = FlowSink(engine, model.readBandwidth) - override val write: SimResourceProvider = SimResourceSource(model.writeBandwidth, interpreter) + override val write: FlowConsumer = FlowSink(engine, model.writeBandwidth) override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 639ca450..37cf282b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -26,8 +26,8 @@ import org.opendc.simulator.compute.device.SimPsu import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.PowerDriver -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.FlowEngine /** * A simulated bare-metal machine that is able to run a single workload. @@ -35,19 +35,19 @@ import org.opendc.simulator.resources.SimResourceInterpreter * A [SimBareMetalMachine] is a stateful object, and you should be careful when operating this object concurrently. For * example, the class expects only a single concurrent call to [run]. * - * @param interpreter The [SimResourceInterpreter] to drive the simulation. + * @param engine The [FlowEngine] to drive the simulation. * @param model The machine model to simulate. * @param powerDriver The power driver to use. * @param psu The power supply of the machine. * @param parent The parent simulation system. */ public class SimBareMetalMachine( - interpreter: SimResourceInterpreter, + engine: FlowEngine, model: MachineModel, powerDriver: PowerDriver, public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)), - parent: SimResourceSystem? = null, -) : SimAbstractMachine(interpreter, parent, model) { + parent: FlowSystem? = null, +) : SimAbstractMachine(engine, parent, model) { /** * The power draw of the machine onto the PSU. */ @@ -58,7 +58,7 @@ public class SimBareMetalMachine( * The processing units of the machine. */ override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu -> - Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu) + Cpu(FlowSink(engine, cpu.frequency, this@SimBareMetalMachine), cpu) } /** @@ -78,9 +78,9 @@ public class SimBareMetalMachine( * A [SimProcessingUnit] of a bare-metal machine. */ private class Cpu( - private val source: SimResourceSource, + private val source: FlowSink, override val model: ProcessingUnit - ) : SimProcessingUnit, SimResourceProvider by source { + ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity set(value) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index d8dd8205..ab0b56ae 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -41,7 +41,7 @@ public interface SimMachine : AutoCloseable { public val peripherals: List<SimPeripheral> /** - * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. */ public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap()) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt index 6996a30d..1317f728 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * A simulated execution context in which a bootable image runs. This interface represents the @@ -31,9 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter */ public interface SimMachineContext : AutoCloseable { /** - * The resource interpreter that simulates the machine. + * The [FlowEngine] that simulates the machine. */ - public val interpreter: SimResourceInterpreter + public val engine: FlowEngine /** * The metadata associated with the context. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt index 6623df23..b1aef495 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt @@ -23,12 +23,12 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer /** * An interface to control the memory usage of simulated workloads. */ -public interface SimMemory : SimResourceProvider { +public interface SimMemory : FlowConsumer { /** * The models representing the static information of the memory units supporting this interface. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt index 1ac126ae..660b2871 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowSource /** * A firmware interface to a network adapter. @@ -42,10 +42,10 @@ public interface SimNetworkInterface { /** * The resource provider for the transmit channel of the network interface. */ - public val tx: SimResourceProvider + public val tx: FlowConsumer /** * The resource consumer for the receive channel of the network interface. */ - public val rx: SimResourceConsumer + public val rx: FlowSource } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt index 93c9ddfa..c9f36ece 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt @@ -23,12 +23,12 @@ package org.opendc.simulator.compute import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer /** * A simulated processing unit. */ -public interface SimProcessingUnit : SimResourceProvider { +public interface SimProcessingUnit : FlowConsumer { /** * The capacity of the processing unit, which can be adjusted by the workload if supported by the machine. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt index 21a801f1..3d648671 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.compute -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer /** * A firmware interface to a storage device. @@ -41,10 +41,10 @@ public interface SimStorageInterface { /** * The resource provider for the read operations of the storage device. */ - public val read: SimResourceProvider + public val read: FlowConsumer /** * The resource consumer for the write operation of the storage device. */ - public val write: SimResourceProvider + public val write: FlowConsumer } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 6e6e590f..b05d8ad9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -23,10 +23,10 @@ package org.opendc.simulator.compute.device import org.opendc.simulator.compute.power.PowerDriver +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource import org.opendc.simulator.power.SimPowerInlet -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent import java.util.* /** @@ -54,7 +54,7 @@ public class SimPsu( /** * The consumer context. */ - private var _ctx: SimResourceContext? = null + private var _ctx: FlowConnection? = null /** * The driver that is connected to the PSU. @@ -69,7 +69,7 @@ public class SimPsu( * Update the power draw of the PSU. */ public fun update() { - _ctx?.interrupt() + _ctx?.pull() } /** @@ -81,18 +81,18 @@ public class SimPsu( update() } - override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun createConsumer(): FlowSource = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) - ctx.push(powerDraw) + conn.push(powerDraw) return Long.MAX_VALUE } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> _ctx = ctx - SimResourceEvent.Run -> _powerDraw = ctx.speed - SimResourceEvent.Exit -> _ctx = null + FlowEvent.Start -> _ctx = conn + FlowEvent.Converge -> _powerDraw = conn.rate + FlowEvent.Exit -> _ctx = null else -> {} } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index cf9e3230..b145eefc 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -28,17 +28,17 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.SimResourceSwitch +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.FlowMultiplexer /** * Abstract implementation of the [SimHypervisor] interface. * - * @param interpreter The resource interpreter to use. + * @param engine The [FlowEngine] to drive the simulation. * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware. */ public abstract class SimAbstractHypervisor( - private val interpreter: SimResourceInterpreter, + protected val engine: FlowEngine, private val scalingGovernor: ScalingGovernor? = null, protected val interferenceDomain: VmInterferenceDomain? = null ) : SimHypervisor { @@ -50,7 +50,7 @@ public abstract class SimAbstractHypervisor( /** * The resource switch to use. */ - private lateinit var switch: SimResourceSwitch + private lateinit var mux: FlowMultiplexer /** * The virtual machines running on this hypervisor. @@ -62,8 +62,8 @@ public abstract class SimAbstractHypervisor( /** * The resource counters associated with the hypervisor. */ - public override val counters: SimResourceCounters - get() = switch.counters + public override val counters: FlowCounters + get() = mux.counters /** * The scaling governors attached to the physical CPUs backing this hypervisor. @@ -71,14 +71,14 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf<ScalingGovernor.Logic>() /** - * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs. + * Construct the [FlowMultiplexer] implementation that performs the actual scheduling of the CPUs. */ - public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch + public abstract fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer /** * Check whether the specified machine model fits on this hypervisor. */ - public abstract fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean + public abstract fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean /** * Trigger the governors to recompute the scaling limits. @@ -91,7 +91,7 @@ public abstract class SimAbstractHypervisor( /* SimHypervisor */ override fun canFit(model: MachineModel): Boolean { - return canFit(model, switch) + return canFit(model, mux) } override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine { @@ -104,7 +104,7 @@ public abstract class SimAbstractHypervisor( /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx - switch = createSwitch(ctx) + mux = createMultiplexer(ctx) for (cpu in ctx.cpus) { val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu)) @@ -113,7 +113,7 @@ public abstract class SimAbstractHypervisor( governor.onStart() } - switch.addInput(cpu) + mux.addOutput(cpu) } } @@ -122,7 +122,7 @@ public abstract class SimAbstractHypervisor( * * @param model The machine model of the virtual machine. */ - private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(interpreter, parent = null, model) { + private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model) { /** * The interference key of this virtual machine. */ @@ -131,7 +131,7 @@ public abstract class SimAbstractHypervisor( /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { VCpu(switch, switch.newOutput(interferenceKey), it) } + override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) } override fun close() { super.close() @@ -153,10 +153,10 @@ public abstract class SimAbstractHypervisor( * A [SimProcessingUnit] of a virtual machine. */ private class VCpu( - private val switch: SimResourceSwitch, - private val source: SimResourceProvider, + private val switch: FlowMultiplexer, + private val source: FlowConsumer, override val model: ProcessingUnit - ) : SimProcessingUnit, SimResourceProvider by source { + ) : SimProcessingUnit, FlowConsumer by source { override var capacity: Double get() = source.capacity set(_) { @@ -169,7 +169,7 @@ public abstract class SimAbstractHypervisor( * Close the CPU */ fun close() { - switch.removeOutput(source) + switch.removeInput(source) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 3b44292d..36ab7c1c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -28,39 +28,39 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSwitch -import org.opendc.simulator.resources.SimResourceSwitchMaxMin -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine] * concurrently using weighted fair sharing. * - * @param interpreter The interpreter to manage the machine's resources. + * @param engine The [FlowEngine] to manage the machine's resources. * @param parent The parent simulation system. * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor. * @param interferenceDomain The resource interference domain to which the hypervisor belongs. * @param listener The hypervisor listener to use. */ public class SimFairShareHypervisor( - private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, + engine: FlowEngine, + private val parent: FlowSystem? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, private val listener: SimHypervisor.Listener? = null -) : SimAbstractHypervisor(interpreter, scalingGovernor, interferenceDomain) { +) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) { - override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean = true + override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean = true - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { + override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer { return SwitchSystem(ctx).switch } - private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem { - val switch = SimResourceSwitchMaxMin(interpreter, this, interferenceDomain) + private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowSystem { + val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain) - override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent + override val parent: FlowSystem? = this@SimFairShareHypervisor.parent private var lastCpuUsage = 0.0 private var lastCpuDemand = 0.0 @@ -87,8 +87,8 @@ public class SimFairShareHypervisor( } lastReport = timestamp - lastCpuDemand = switch.inputs.sumOf { it.demand } - lastCpuUsage = switch.inputs.sumOf { it.speed } + lastCpuDemand = switch.outputs.sumOf { it.demand } + lastCpuUsage = switch.outputs.sumOf { it.rate } lastDemand = counters.demand lastActual = counters.actual lastOvercommit = counters.overcommit diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index 8d0592ec..bfa099fb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -34,13 +34,13 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" override fun create( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem?, + engine: FlowEngine, + parent: FlowSystem?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? ): SimHypervisor = SimFairShareHypervisor( - interpreter, + engine, parent, scalingGovernor = scalingGovernor, interferenceDomain = interferenceDomain, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 3b49d515..1b11ca6b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -25,7 +25,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.resources.SimResourceCounters +import org.opendc.simulator.flow.FlowCounters /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload @@ -40,7 +40,7 @@ public interface SimHypervisor : SimWorkload { /** * The resource counters associated with the hypervisor. */ - public val counters: SimResourceCounters + public val counters: FlowCounters /** * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index b307a34d..97f07097 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem /** * A service provider interface for constructing a [SimHypervisor]. @@ -43,8 +43,8 @@ public interface SimHypervisorProvider { * Create a [SimHypervisor] instance with the specified [listener]. */ public fun create( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem? = null, + engine: FlowEngine, + parent: FlowSystem? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, listener: SimHypervisor.Listener? = null diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index ac1c0250..883e0d82 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -24,19 +24,19 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSwitch -import org.opendc.simulator.resources.SimResourceSwitchExclusive +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer /** * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. */ -public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) { - override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean { - return switch.inputs.size - switch.outputs.size >= model.cpus.size +public class SimSpaceSharedHypervisor(engine: FlowEngine) : SimAbstractHypervisor(engine) { + override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean { + return switch.outputs.size - switch.inputs.size >= model.cpus.size } - override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { - return SimResourceSwitchExclusive() + override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer { + return ForwardingFlowMultiplexer(engine) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index 3906cb9a..7869d72d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSystem +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSystem /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -34,10 +34,10 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" override fun create( - interpreter: SimResourceInterpreter, - parent: SimResourceSystem?, + engine: FlowEngine, + parent: FlowSystem?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? - ): SimHypervisor = SimSpaceSharedHypervisor(interpreter) + ): SimHypervisor = SimSpaceSharedHypervisor(engine) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index 1801fcd0..b737d61a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.resources.interference.InterferenceDomain -import org.opendc.simulator.resources.interference.InterferenceKey +import org.opendc.simulator.flow.interference.InterferenceDomain +import org.opendc.simulator.flow.interference.InterferenceKey /** * The interference domain of a hypervisor. diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index c2e00c8e..b3d72507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.resources.interference.InterferenceKey +import org.opendc.simulator.flow.interference.InterferenceKey import java.util.* /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt index 6577fbfc..f71446f8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt @@ -46,7 +46,7 @@ public class PStatePowerDriver(states: Map<Double, PowerModel>) : PowerDriver { for (cpu in cpus) { targetFreq = max(cpu.capacity, targetFreq) - totalSpeed += cpu.speed + totalSpeed += cpu.rate } val maxFreq = states.lastKey() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt index bf7aeff1..34e91c35 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt @@ -37,7 +37,7 @@ public class SimplePowerDriver(private val model: PowerModel) : PowerDriver { for (cpu in cpus) { targetFreq += cpu.capacity - totalSpeed += cpu.speed + totalSpeed += cpu.rate } return model.computePower(totalSpeed / targetFreq) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index a01fa20c..99f4a1e1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.source.FixedFlowSource /** * A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on @@ -44,7 +44,7 @@ public class SimFlopsWorkload( override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { - cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization))) + cpu.startConsumer(lifecycle.waitFor(FixedFlowSource(flops.toDouble() / ctx.cpus.size, utilization))) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 4ee56689..2ef3bc43 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.source.FixedFlowSource /** * A [SimWorkload] that models application execution as a single duration. @@ -44,7 +44,7 @@ public class SimRuntimeWorkload( val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { val limit = cpu.capacity * utilization - cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization))) + cpu.startConsumer(lifecycle.waitFor(FixedFlowSource((limit / 1000) * duration, utilization))) } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index dd582bb2..a877dac1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource import kotlin.math.min /** @@ -78,12 +78,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val return now >= timestamp + duration } - private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + private inner class Consumer(val cpu: ProcessingUnit) : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val fragment = pullFragment(now) if (fragment == null) { - ctx.close() + conn.close() return Long.MAX_VALUE } @@ -91,7 +91,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val // Fragment is in the future if (timestamp > now) { - ctx.push(0.0) + conn.push(0.0) return timestamp - now } @@ -103,7 +103,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val val deadline = timestamp + fragment.duration val duration = deadline - now - ctx.push(if (cpu.id < cores && usage > 0.0) usage else 0.0) + conn.push(if (cpu.id < cores && usage > 0.0) usage else 0.0) return duration } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index 5dd18271..dabe60e0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -23,9 +23,9 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource /** * A helper class to manage the lifecycle of a [SimWorkload] @@ -34,27 +34,27 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { /** * The resource consumers which represent the lifecycle of the workload. */ - private val waiting = mutableSetOf<SimResourceConsumer>() + private val waiting = mutableSetOf<FlowSource>() /** * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. */ - public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer { + public fun waitFor(consumer: FlowSource): FlowSource { waiting.add(consumer) - return object : SimResourceConsumer by consumer { - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + return object : FlowSource by consumer { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { try { - consumer.onEvent(ctx, event) + consumer.onEvent(conn, now, event) } finally { - if (event == SimResourceEvent.Exit) { + if (event == FlowEvent.Exit) { complete(consumer) } } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { try { - consumer.onFailure(ctx, cause) + consumer.onFailure(conn, cause) } finally { complete(consumer) } @@ -65,9 +65,9 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { } /** - * Complete the specified [SimResourceConsumer]. + * Complete the specified [FlowSource]. */ - private fun complete(consumer: SimResourceConsumer) { + private fun complete(consumer: FlowSource) { if (waiting.remove(consumer) && waiting.isEmpty()) { ctx.close() } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 81268879..0bb24ed8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -34,10 +34,10 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.compute.workload.SimWorkloadLifecycle import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.source.FixedFlowSource import org.opendc.simulator.network.SimNetworkSink import org.opendc.simulator.power.SimPowerSource -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer /** * Test suite for the [SimBareMetalMachine] class. @@ -60,7 +60,7 @@ class SimMachineTest { @Test fun testFlopsWorkload() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -83,7 +83,7 @@ class SimMachineTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -100,13 +100,13 @@ class SimMachineTest { @Test fun testPower() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(LinearPowerModel(100.0, 50.0)) ) - val source = SimPowerSource(interpreter, capacity = 1000.0) + val source = SimPowerSource(engine, capacity = 1000.0) source.connect(machine.psu) try { @@ -125,7 +125,7 @@ class SimMachineTest { @Test fun testCapacityClamp() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -151,7 +151,7 @@ class SimMachineTest { @Test fun testMemory() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -171,7 +171,7 @@ class SimMachineTest { @Test fun testMemoryUsage() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -180,7 +180,7 @@ class SimMachineTest { machine.run(object : SimWorkload { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) - ctx.memory.startConsumer(lifecycle.waitFor(SimWorkConsumer(ctx.memory.capacity, utilization = 0.8))) + ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8))) } }) @@ -192,22 +192,22 @@ class SimMachineTest { @Test fun testNetUsage() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) val adapter = (machine.peripherals[0] as SimNetworkAdapter) - adapter.connect(SimNetworkSink(interpreter, adapter.bandwidth)) + adapter.connect(SimNetworkSink(engine, adapter.bandwidth)) try { machine.run(object : SimWorkload { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) val iface = ctx.net[0] - iface.tx.startConsumer(lifecycle.waitFor(SimWorkConsumer(iface.bandwidth, utilization = 0.8))) + iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8))) } }) @@ -219,9 +219,9 @@ class SimMachineTest { @Test fun testDiskReadUsage() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -231,7 +231,7 @@ class SimMachineTest { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) val disk = ctx.storage[0] - disk.read.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.read.capacity, utilization = 0.8))) + disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8))) } }) @@ -243,9 +243,9 @@ class SimMachineTest { @Test fun testDiskWriteUsage() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -255,7 +255,7 @@ class SimMachineTest { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) val disk = ctx.storage[0] - disk.write.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.write.capacity, utilization = 0.8))) + disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8))) } }) @@ -268,7 +268,7 @@ class SimMachineTest { @Test fun testCancellation() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -290,7 +290,7 @@ class SimMachineTest { @Test fun testConcurrentRuns() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -313,7 +313,7 @@ class SimMachineTest { @Test fun testClose() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt index 6c9ec7bd..e5b509f0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt @@ -29,8 +29,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine import org.opendc.simulator.power.SimPowerSource -import org.opendc.simulator.resources.SimResourceInterpreter /** * Test suite for [SimPsu] @@ -55,8 +55,8 @@ internal class SimPsuTest { val ratedOutputPower = 240.0 val energyEfficiency = mapOf(0.0 to 1.0) - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = ratedOutputPower) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = ratedOutputPower) val cpuLogic = mockk<PowerDriver.Logic>() every { cpuLogic.computePower() } returns 0.0 @@ -78,8 +78,8 @@ internal class SimPsuTest { 1.0 to 0.94, ) - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = ratedOutputPower) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = ratedOutputPower) val cpuLogic = mockk<PowerDriver.Logic>() every { cpuLogic.computePower() } returnsMany listOf(50.0, 100.0, 150.0, 200.0) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index 8cd535ad..058d5d28 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -40,7 +40,7 @@ import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * Test suite for the [SimHypervisor] class. @@ -94,7 +94,7 @@ internal class SimHypervisorTest { ), ) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0))) val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener) @@ -163,7 +163,7 @@ internal class SimHypervisorTest { ) ) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -204,7 +204,7 @@ internal class SimHypervisorTest { memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } ) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -234,7 +234,7 @@ internal class SimHypervisorTest { ) val interferenceModel = VmInterferenceModel(groups) - val platform = SimResourceInterpreter(coroutineContext, clock) + val platform = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( platform, model, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index 55d6d7c4..95fb6679 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -40,7 +40,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * A test suite for the [SimSpaceSharedHypervisor]. @@ -74,11 +74,11 @@ internal class SimSpaceSharedHypervisorTest { ), ) - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } val vm = hypervisor.createMachine(machineModel) @@ -98,11 +98,11 @@ internal class SimSpaceSharedHypervisorTest { fun testRuntimeWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimRuntimeWorkload(duration) - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -121,11 +121,11 @@ internal class SimSpaceSharedHypervisorTest { fun testFlopsWorkload() = runBlockingSimulation { val duration = 5 * 60L * 1000 val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -142,11 +142,11 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testTwoWorkloads() = runBlockingSimulation { val duration = 5 * 60L * 1000 - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val engine = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) + engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -170,11 +170,9 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val machine = SimBareMetalMachine( - interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) - ) - val hypervisor = SimSpaceSharedHypervisor(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))) + val hypervisor = SimSpaceSharedHypervisor(engine) launch { machine.run(hypervisor) } yield() @@ -194,7 +192,7 @@ internal class SimSpaceSharedHypervisorTest { */ @Test fun testConcurrentWorkloadSucceeds() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val interpreter = FlowEngine(coroutineContext, clock) val machine = SimBareMetalMachine( interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt index c39859bf..f557c8d3 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt @@ -55,7 +55,7 @@ internal class PStatePowerDriverTest { val cpu = mockk<SimProcessingUnit>(relaxUnitFun = true) every { cpu.capacity } returns 3200.0 - every { cpu.speed } returns 1200.0 + every { cpu.rate } returns 1200.0 val driver = PStatePowerDriver( sortedMapOf( @@ -77,10 +77,10 @@ internal class PStatePowerDriverTest { val cpus = listOf(cpu, cpu) every { cpus[0].capacity } returns 1000.0 - every { cpus[0].speed } returns 1200.0 + every { cpus[0].rate } returns 1200.0 every { cpus[1].capacity } returns 3500.0 - every { cpus[1].speed } returns 1200.0 + every { cpus[1].rate } returns 1200.0 val driver = PStatePowerDriver( sortedMapOf( @@ -112,11 +112,11 @@ internal class PStatePowerDriverTest { val logic = driver.createLogic(machine, listOf(cpu)) - every { cpu.speed } returns 1400.0 + every { cpu.rate } returns 1400.0 every { cpu.capacity } returns 1400.0 assertEquals(150.0, logic.computePower()) - every { cpu.speed } returns 1400.0 + every { cpu.rate } returns 1400.0 every { cpu.capacity } returns 4000.0 assertEquals(235.0, logic.computePower()) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index 78019c2e..cdbffe4b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -31,7 +31,7 @@ import org.opendc.simulator.compute.model.* import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine /** * Test suite for the [SimTraceWorkloadTest] class. @@ -52,7 +52,7 @@ class SimTraceWorkloadTest { @Test fun testSmoke() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -79,7 +79,7 @@ class SimTraceWorkloadTest { @Test fun testOffset() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -106,7 +106,7 @@ class SimTraceWorkloadTest { @Test fun testSkipFragment() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) @@ -134,7 +134,7 @@ class SimTraceWorkloadTest { @Test fun testZeroCores() = runBlockingSimulation { val machine = SimBareMetalMachine( - SimResourceInterpreter(coroutineContext, clock), + FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0)) ) diff --git a/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts index 68047d5c..5a956fee 100644 --- a/opendc-simulator/opendc-simulator-resources/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Uniform resource consumption simulation model" +description = "High-performance flow simulator" plugins { `kotlin-library-conventions` diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index fbc3f319..4834f10f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -20,13 +20,15 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow.source.TraceFlowSource import org.openjdk.jmh.annotations.* import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit @@ -36,101 +38,101 @@ import java.util.concurrent.TimeUnit @Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) @OptIn(ExperimentalCoroutinesApi::class) -class SimResourceBenchmarks { +class FlowBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var interpreter: SimResourceInterpreter + private lateinit var engine: FlowEngine @Setup fun setUp() { scope = SimulationCoroutineScope() - interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) + engine = FlowEngine(scope.coroutineContext, scope.clock) } @State(Scope.Thread) class Workload { - lateinit var trace: Sequence<SimTraceConsumer.Fragment> + lateinit var trace: Sequence<TraceFlowSource.Fragment> @Setup fun setUp() { val random = ThreadLocalRandom.current() - val entries = List(10000) { SimTraceConsumer.Fragment(1000, random.nextDouble(0.0, 4500.0)) } + val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } trace = entries.asSequence() } } @Benchmark - fun benchmarkSource(state: Workload) { + fun benchmarkSink(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, interpreter) - return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) + val provider = FlowSink(engine, 4200.0) + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkForwardOverhead(state: Workload) { + fun benchmarkForward(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, interpreter) - val forwarder = SimResourceForwarder() + val provider = FlowSink(engine, 4200.0) + val forwarder = FlowForwarder(engine) provider.startConsumer(forwarder) - return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace)) + return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { + fun benchmarkMuxMaxMinSingleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(interpreter) + val switch = MaxMinFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) - val provider = switch.newOutput() - return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { + fun benchmarkMuxMaxMinTripleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(interpreter) + val switch = MaxMinFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) repeat(3) { launch { - val provider = switch.newOutput() - provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) } } } } @Benchmark - fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) { + fun benchmarkMuxExclusiveSingleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchExclusive() + val switch = ForwardingFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) - val provider = switch.newOutput() - return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) } } @Benchmark - fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) { + fun benchmarkMuxExclusiveTripleSource(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchExclusive() + val switch = ForwardingFlowMultiplexer(engine) - switch.addInput(SimResourceSource(3000.0, interpreter)) - switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) repeat(2) { launch { - val provider = switch.newOutput() - provider.consume(SimTraceConsumer(state.trace)) + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index 085cba63..c8092082 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -20,25 +20,22 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow -import org.opendc.simulator.resources.impl.SimResourceCountersImpl +import org.opendc.simulator.flow.internal.FlowCountersImpl /** - * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations. + * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. */ -public abstract class SimAbstractResourceProvider( - private val interpreter: SimResourceInterpreter, - initialCapacity: Double -) : SimResourceProvider { +public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { /** - * A flag to indicate that the resource provider is active. + * A flag to indicate that the flow consumer is active. */ public override val isActive: Boolean get() = ctx != null /** - * The capacity of the resource. + * The capacity of the consumer. */ public override var capacity: Double = initialCapacity set(value) { @@ -47,51 +44,51 @@ public abstract class SimAbstractResourceProvider( } /** - * The current processing speed of the resource. + * The current processing rate of the consumer. */ - public override val speed: Double - get() = ctx?.speed ?: 0.0 + public override val rate: Double + get() = ctx?.rate ?: 0.0 /** - * The resource processing speed demand at this instant. + * The flow processing rate demand at this instant. */ public override val demand: Double get() = ctx?.demand ?: 0.0 /** - * The resource counters to track the execution metrics of the resource. + * The flow counters to track the flow metrics of the consumer. */ - public override val counters: SimResourceCounters + public override val counters: FlowCounters get() = _counters - private val _counters = SimResourceCountersImpl() + private val _counters = FlowCountersImpl() /** - * The [SimResourceControllableContext] that is currently running. + * The [FlowConsumerContext] that is currently running. */ - protected var ctx: SimResourceControllableContext? = null + protected var ctx: FlowConsumerContext? = null private set /** - * Construct the [SimResourceProviderLogic] instance for a new consumer. + * Construct the [FlowConsumerLogic] instance for a new source. */ - protected abstract fun createLogic(): SimResourceProviderLogic + protected abstract fun createLogic(): FlowConsumerLogic /** - * Start the specified [SimResourceControllableContext]. + * Start the specified [FlowConsumerContext]. */ - protected open fun start(ctx: SimResourceControllableContext) { + protected open fun start(ctx: FlowConsumerContext) { ctx.start() } /** - * The previous demand for the resource. + * The previous demand for the consumer. */ private var previousDemand = 0.0 /** - * Update the counters of the resource provider. + * Update the counters of the flow consumer. */ - protected fun updateCounters(ctx: SimResourceContext, delta: Long) { + protected fun updateCounters(ctx: FlowConnection, delta: Long) { val demand = previousDemand previousDemand = ctx.demand @@ -102,7 +99,7 @@ public abstract class SimAbstractResourceProvider( val counters = _counters val deltaS = delta / 1000.0 val work = demand * deltaS - val actualWork = ctx.speed * deltaS + val actualWork = ctx.rate * deltaS val remainingWork = work - actualWork counters.demand += work @@ -111,7 +108,7 @@ public abstract class SimAbstractResourceProvider( } /** - * Update the counters of the resource provider. + * Update the counters of the flow consumer. */ protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { val counters = _counters @@ -120,9 +117,9 @@ public abstract class SimAbstractResourceProvider( counters.overcommit += overcommit } - final override fun startConsumer(consumer: SimResourceConsumer) { - check(ctx == null) { "Resource is in invalid state" } - val ctx = interpreter.newContext(consumer, createLogic()) + final override fun startConsumer(source: FlowSource) { + check(ctx == null) { "Consumer is in invalid state" } + val ctx = engine.newContext(source, createLogic()) ctx.capacity = capacity this.ctx = ctx @@ -130,8 +127,8 @@ public abstract class SimAbstractResourceProvider( start(ctx) } - final override fun interrupt() { - ctx?.interrupt() + final override fun pull() { + ctx?.pull() } final override fun cancel() { @@ -142,5 +139,5 @@ public abstract class SimAbstractResourceProvider( } } - override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]" + override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index 225cae0b..fa833961 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -20,49 +20,41 @@ * SOFTWARE. */ -package org.opendc.simulator.resources - -import java.time.Clock +package org.opendc.simulator.flow /** - * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a - * resource and a resource consumer. + * An active connection between a [FlowSource] and [FlowConsumer]. */ -public interface SimResourceContext : AutoCloseable { - /** - * The virtual clock tracking simulation time. - */ - public val clock: Clock - +public interface FlowConnection : AutoCloseable { /** - * The resource capacity available at this instant. + * The capacity of the connection. */ public val capacity: Double /** - * The resource processing speed at this instant. + * The flow rate over the connection. */ - public val speed: Double + public val rate: Double /** - * The resource processing speed demand at this instant. + * The flow demand of the source. */ public val demand: Double /** - * Ask the resource provider to interrupt its resource. + * Pull the source. */ - public fun interrupt() + public fun pull() /** - * Push the given flow to this context. + * Push the given flow [rate] over this connection. * * @param rate The rate of the flow to push. */ public fun push(rate: Double) /** - * Stop the resource context. + * Disconnect the consumer from its source. */ public override fun close() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index b68b7261..3a6e2e97 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -20,77 +20,80 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException /** - * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. + * A consumer of a [FlowSource]. */ -public interface SimResourceProvider { +public interface FlowConsumer { /** - * A flag to indicate that the resource provider is currently being consumed by a [SimResourceConsumer]. + * A flag to indicate that the consumer is currently consuming a [FlowSource]. */ public val isActive: Boolean /** - * The resource capacity available at this instant. + * The flow capacity of this consumer. */ public val capacity: Double /** - * The current processing speed of the resource. + * The current flow rate of the consumer. */ - public val speed: Double + public val rate: Double /** - * The resource processing speed demand at this instant. + * The current flow demand. */ public val demand: Double /** - * The resource counters to track the execution metrics of the resource. + * The flow counters to track the flow metrics of the consumer. */ - public val counters: SimResourceCounters + public val counters: FlowCounters /** - * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. + * Start consuming the specified [source]. * - * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. + * @throws IllegalStateException if the consumer is already active. */ - public fun startConsumer(consumer: SimResourceConsumer) + public fun startConsumer(source: FlowSource) /** - * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op. + * Ask the consumer to pull its source. + * + * If the consumer is not active, this operation will be a no-op. */ - public fun interrupt() + public fun pull() /** - * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op. + * Disconnect the consumer from its source. + * + * If the consumer is not active, this operation will be a no-op. */ public fun cancel() } /** - * Consume the resource provided by this provider using the specified [consumer] and suspend execution until - * the consumer has finished. + * Consume the specified [source] and suspend execution until the source is fully consumed or failed. */ -public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { +public suspend fun FlowConsumer.consume(source: FlowSource) { return suspendCancellableCoroutine { cont -> - startConsumer(object : SimResourceConsumer by consumer { - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - consumer.onEvent(ctx, event) + startConsumer(object : FlowSource by source { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + source.onEvent(conn, now, event) - if (event == SimResourceEvent.Exit && !cont.isCompleted) { + if (event == FlowEvent.Exit && !cont.isCompleted) { cont.resume(Unit) } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { try { - consumer.onFailure(ctx, cause) + source.onFailure(conn, cause) cont.resumeWithException(cause) } catch (e: Throwable) { e.addSuppressed(cause) @@ -98,7 +101,7 @@ public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { } } - override fun toString(): String = "SimSuspendingResourceConsumer" + override fun toString(): String = "SuspendingFlowSource" }) cont.invokeOnCancellation { cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt new file mode 100644 index 00000000..75b2d25b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A controllable [FlowConnection]. + * + * This interface is used by [FlowConsumer]s to control the connection between it and the source. + */ +public interface FlowConsumerContext : FlowConnection { + /** + * The capacity of the connection. + */ + public override var capacity: Double + + /** + * Start the flow over the connection. + */ + public fun start() + + /** + * Synchronously flush the changes of the connection. + */ + public fun flush() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index cc718165..c69cb17e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -20,23 +20,21 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A collection of callbacks associated with a flow stage. + * A collection of callbacks associated with a [FlowConsumer]. */ -public interface SimResourceProviderLogic { +public interface FlowConsumerLogic { /** - * This method is invoked when the consumer ask to consume the resource for the specified [duration]. + * This method is invoked when a [FlowSource] changes the rate of flow to this consumer. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds. - * @param limit The limit on the work rate of the resource consumer. - * @param duration The duration of the consumption in milliseconds. - * @return The deadline of the resource consumption. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + * @param rate The requested processing rate of the source. */ - public fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {} + public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -45,14 +43,14 @@ public interface SimResourceProviderLogic { * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {} + public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} /** - * This method is invoked when the resource consumer has finished. + * This method is invoked when the [FlowSource] is completed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. */ - public fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index 11924db2..e15d7643 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -20,34 +20,34 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * An interface that tracks cumulative counts of the work performed by a resource. + * An interface that tracks cumulative counts of the flow accumulation over a stage. */ -public interface SimResourceCounters { +public interface FlowCounters { /** - * The amount of work that resource consumers wanted the resource to perform. + * The accumulated flow that a source wanted to push over the connection. */ public val demand: Double /** - * The amount of work performed by the resource. + * The accumulated flow that was actually transferred over the connection. */ public val actual: Double /** - * The amount of work that could not be completed due to overcommitted resources. + * The accumulated flow that could not be transferred over the connection. */ public val overcommit: Double /** - * The amount of work lost due to interference. + * The accumulated flow lost due to interference between sources. */ public val interference: Double /** - * Reset the resource counters. + * Reset the flow counters. */ public fun reset() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt index 4bfeaf20..65224827 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt @@ -20,31 +20,31 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl import java.time.Clock import kotlin.coroutines.CoroutineContext /** - * The resource interpreter is responsible for managing the interaction between resource consumer and provider. + * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s. * - * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation + * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. */ -public interface SimResourceInterpreter { +public interface FlowEngine { /** - * The [Clock] associated with this interpreter. + * The virtual [Clock] associated with this engine. */ public val clock: Clock /** - * Create a new [SimResourceControllableContext] with the given [provider]. + * Create a new [FlowConsumerContext] with the given [provider]. * * @param consumer The consumer logic. * @param provider The logic of the resource provider. */ - public fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext + public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext /** * Start batching the execution of resource updates until [popBatch] is called. @@ -67,14 +67,15 @@ public interface SimResourceInterpreter { public companion object { /** - * Construct a new [SimResourceInterpreter] implementation. + * Construct a new [FlowEngine] implementation. * * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ + @JvmStatic @JvmName("create") - public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter { - return SimResourceInterpreterImpl(context, clock) + public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine { + return FlowEngineImpl(context, clock) } } } @@ -84,7 +85,7 @@ public interface SimResourceInterpreter { * * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. */ -public inline fun SimResourceInterpreter.batch(block: () -> Unit) { +public inline fun FlowEngine.batch(block: () -> Unit) { try { pushBatch() block() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt index 959427f1..14c85183 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt @@ -20,29 +20,29 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A resource event that is communicated to the resource consumer. + * A flow event that is communicated to a [FlowSource]. */ -public enum class SimResourceEvent { +public enum class FlowEvent { /** - * This event is emitted to the consumer when it has started. + * This event is emitted to the source when it has started. */ Start, /** - * This event is emitted to the consumer when it has exited. + * This event is emitted to the source when it is stopped. */ Exit, /** - * This event is emitted to the consumer when it has started a new resource consumption or idle cycle. + * This event is emitted to the source when the system has converged into a steady state. */ - Run, + Converge, /** - * This event is emitted to the consumer when the capacity of the resource has changed. + * This event is emitted to the source when the capacity of the consumer has changed. */ Capacity, } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 0cd2bfc7..2074033e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -20,21 +20,21 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow -import org.opendc.simulator.resources.impl.SimResourceCountersImpl -import java.time.Clock +import org.opendc.simulator.flow.internal.FlowCountersImpl /** - * A class that acts as a [SimResourceConsumer] and [SimResourceProvider] at the same time. + * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. * + * @param engine The [FlowEngine] the forwarder runs in. * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. */ -public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimResourceConsumer, SimResourceProvider, AutoCloseable { +public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { /** - * The delegate [SimResourceConsumer]. + * The delegate [FlowSource]. */ - private var delegate: SimResourceConsumer? = null + private var delegate: FlowSource? = null /** * A flag to indicate that the delegate was started. @@ -42,28 +42,25 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR private var hasDelegateStarted: Boolean = false /** - * The exposed [SimResourceContext]. + * The exposed [FlowConnection]. */ - private val _ctx = object : SimResourceContext { - override val clock: Clock - get() = _innerCtx!!.clock - + private val _ctx = object : FlowConnection { override val capacity: Double get() = _innerCtx?.capacity ?: 0.0 override val demand: Double get() = _innerCtx?.demand ?: 0.0 - override val speed: Double - get() = _innerCtx?.speed ?: 0.0 + override val rate: Double + get() = _innerCtx?.rate ?: 0.0 - override fun interrupt() { - _innerCtx?.interrupt() + override fun pull() { + _innerCtx?.pull() } override fun push(rate: Double) { _innerCtx?.push(rate) - _limit = rate + _demand = rate } override fun close() { @@ -78,14 +75,14 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR // reset beforehand the existing state and check whether it has been updated afterwards reset() - delegate.onEvent(this, SimResourceEvent.Exit) + delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) } } /** - * The [SimResourceContext] in which the forwarder runs. + * The [FlowConnection] in which the forwarder runs. */ - private var _innerCtx: SimResourceContext? = null + private var _innerCtx: FlowConnection? = null override val isActive: Boolean get() = delegate != null @@ -93,27 +90,27 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR override val capacity: Double get() = _ctx.capacity - override val speed: Double - get() = _ctx.speed + override val rate: Double + get() = _ctx.rate override val demand: Double get() = _ctx.demand - override val counters: SimResourceCounters + override val counters: FlowCounters get() = _counters - private val _counters = SimResourceCountersImpl() + private val _counters = FlowCountersImpl() - override fun startConsumer(consumer: SimResourceConsumer) { - check(delegate == null) { "Resource transformer already active" } + override fun startConsumer(source: FlowSource) { + check(delegate == null) { "Forwarder already active" } - delegate = consumer + delegate = source - // Interrupt the provider to replace the consumer - interrupt() + // Pull to replace the source + pull() } - override fun interrupt() { - _ctx.interrupt() + override fun pull() { + _ctx.pull() } override fun cancel() { @@ -124,7 +121,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR this.delegate = null if (ctx != null) { - delegate.onEvent(this._ctx, SimResourceEvent.Exit) + delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit) } } } @@ -134,41 +131,41 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR if (ctx != null) { this._innerCtx = null - ctx.interrupt() + ctx.pull() } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val delegate = delegate if (!hasDelegateStarted) { start() } - updateCounters(ctx, delta) + updateCounters(conn, delta) - return delegate?.onNext(this._ctx, now, delta) ?: Long.MAX_VALUE + return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> { - _innerCtx = ctx + FlowEvent.Start -> { + _innerCtx = conn } - SimResourceEvent.Exit -> { + FlowEvent.Exit -> { _innerCtx = null val delegate = delegate if (delegate != null) { reset() - delegate.onEvent(this._ctx, SimResourceEvent.Exit) + delegate.onEvent(this._ctx, now, FlowEvent.Exit) } } - else -> delegate?.onEvent(this._ctx, event) + else -> delegate?.onEvent(this._ctx, now, event) } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { _innerCtx = null val delegate = delegate @@ -183,7 +180,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR */ private fun start() { val delegate = delegate ?: return - delegate.onEvent(checkNotNull(_innerCtx), SimResourceEvent.Start) + delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start) hasDelegateStarted = true } @@ -197,22 +194,22 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR } /** - * The requested speed. + * The requested flow rate. */ - private var _limit: Double = 0.0 + private var _demand: Double = 0.0 /** - * Update the resource counters for the transformer. + * Update the flow counters for the transformer. */ - private fun updateCounters(ctx: SimResourceContext, delta: Long) { + private fun updateCounters(ctx: FlowConnection, delta: Long) { if (delta <= 0) { return } val counters = _counters val deltaS = delta / 1000.0 - val work = _limit * deltaS - val actualWork = ctx.speed * deltaS + val work = _demand * deltaS + val actualWork = ctx.rate * deltaS counters.demand += work counters.actual += actualWork counters.overcommit += (work - actualWork) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index c8d4cf0d..fb6ca85d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -20,42 +20,42 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity. + * A [FlowSink] represents a sink with a fixed capacity. * * @param initialCapacity The initial capacity of the resource. - * @param interpreter The interpreter that is used for managing the resource contexts. - * @param parent The parent resource system. + * @param engine The engine that is used for driving the flow simulation. + * @param parent The parent flow system. */ -public class SimResourceSource( +public class FlowSink( + private val engine: FlowEngine, initialCapacity: Double, - private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null -) : SimAbstractResourceProvider(interpreter, initialCapacity) { - override fun createLogic(): SimResourceProviderLogic { - return object : SimResourceProviderLogic { - override fun onConsume( - ctx: SimResourceControllableContext, + private val parent: FlowSystem? = null +) : AbstractFlowConsumer(engine, initialCapacity) { + + override fun createLogic(): FlowConsumerLogic { + return object : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, now: Long, delta: Long, - limit: Double, - duration: Long + rate: Double ) { updateCounters(ctx, delta) } - override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { updateCounters(ctx, delta) cancel() } - override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { parent?.onConverge(now) } } } - override fun toString(): String = "SimResourceSource[capacity=$capacity]" + override fun toString(): String = "FlowSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 0b25358a..077b4d38 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -20,38 +20,39 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** - * A [SimResourceConsumer] characterizes how a resource is consumed. + * A source of flow that is consumed by a [FlowConsumer]. * - * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) - * for multiple resource providers, unless explicitly said otherwise. + * Implementations of this interface should be considered stateful and must be assumed not to be re-usable + * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise. */ -public interface SimResourceConsumer { +public interface FlowSource { /** - * This method is invoked when the resource provider is pulling this resource consumer. + * This method is invoked when the source is pulled. * - * @param ctx The execution context in which the consumer runs. - * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call in milliseconds. + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the pull is occurring. + * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. * @return The duration after which the resource consumer should be pulled again. */ - public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long + public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long /** * This method is invoked when an event has occurred. * - * @param ctx The execution context in which the consumer runs. + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the event is occurring. * @param event The event that has occurred. */ - public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {} + public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} /** - * This method is invoked when a resource consumer throws an exception. + * This method is invoked when the source throws an exception. * - * @param ctx The execution context in which the consumer runs. + * @param conn The connection between the source and consumer. * @param cause The cause of the failure. */ - public fun onFailure(ctx: SimResourceContext, cause: Throwable) {} + public fun onFailure(conn: FlowConnection, cause: Throwable) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt index 609262cb..db6aa69f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow /** * A system of possible multiple sub-resources. @@ -28,11 +28,11 @@ package org.opendc.simulator.resources * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the * resource provider. */ -public interface SimResourceSystem { +public interface FlowSystem { /** * The parent system to which this system belongs or `null` if it has no parent. */ - public val parent: SimResourceSystem? + public val parent: FlowSystem? /** * This method is invoked when the system has converged to a steady-state. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt index 1066777f..aa2713b6 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt @@ -1,10 +1,10 @@ -package org.opendc.simulator.resources.interference +package org.opendc.simulator.flow.interference -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.flow.FlowSource /** - * An interference domain represents a system of resources where [resource consumers][SimResourceConsumer] may incur - * performance variability due to operating on the same resources and therefore causing interference. + * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur + * performance variability due to operating on the same resource and therefore causing interference. */ public interface InterferenceDomain { /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt index 8b12e7b4..d28ebde5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.interference +package org.opendc.simulator.flow.interference /** * A key that uniquely identifies a participant of an interference domain. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index cbfa7afd..9f3afc4d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -20,28 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.impl +package org.opendc.simulator.flow.internal -import org.opendc.simulator.resources.* -import java.time.Clock +import org.opendc.simulator.flow.* import java.util.ArrayDeque import kotlin.math.max import kotlin.math.min /** - * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. */ -internal class SimResourceContextImpl( - private val interpreter: SimResourceInterpreterImpl, - private val consumer: SimResourceConsumer, - private val logic: SimResourceProviderLogic -) : SimResourceControllableContext { +internal class FlowConsumerContextImpl( + private val engine: FlowEngineImpl, + private val source: FlowSource, + private val logic: FlowConsumerLogic +) : FlowConsumerContext { /** - * The clock of the context. + * The clock to track simulation time. */ - override val clock: Clock - get() = _clock - private val _clock = interpreter.clock + private val _clock = engine.clock /** * The capacity of the resource. @@ -65,7 +62,7 @@ internal class SimResourceContextImpl( /** * The current processing speed of the resource. */ - override val speed: Double + override val rate: Double get() = _rate private var _rate = 0.0 @@ -101,14 +98,14 @@ internal class SimResourceContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque() + private val _timers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque() override fun start() { check(_state == State.Pending) { "Consumer is already started" } - interpreter.batch { - consumer.onEvent(this, SimResourceEvent.Start) + engine.batch { + source.onEvent(this, _clock.millis(), FlowEvent.Start) _state = State.Active - interrupt() + pull() } } @@ -117,36 +114,27 @@ internal class SimResourceContextImpl( return } - interpreter.batch { + engine.batch { _state = State.Stopped if (!_updateActive) { - val now = clock.millis() + val now = _clock.millis() val delta = max(0, now - _lastUpdate) doStop(now, delta) // FIX: Make sure the context converges _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(clock.millis()) + scheduleUpdate(_clock.millis()) } } } - override fun interrupt() { + override fun pull() { if (_state == State.Stopped) { return } _flag = _flag or FLAG_INTERRUPT - scheduleUpdate(clock.millis()) - } - - override fun invalidate() { - if (_state == State.Stopped) { - return - } - - _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(clock.millis()) + scheduleUpdate(_clock.millis()) } override fun flush() { @@ -154,7 +142,7 @@ internal class SimResourceContextImpl( return } - interpreter.scheduleSync(clock.millis(), this) + engine.scheduleSync(_clock.millis(), this) } override fun push(rate: Double) { @@ -167,7 +155,8 @@ internal class SimResourceContextImpl( // Invalidate only if the active limit is change and no update is active // If an update is active, it will already get picked up at the end of the update if (_activeLimit != rate && !_updateActive) { - invalidate() + _flag = _flag or FLAG_INVALIDATE + scheduleUpdate(_clock.millis()) } } @@ -196,7 +185,7 @@ internal class SimResourceContextImpl( val delta = max(0, now - lastUpdate) try { - val duration = consumer.onNext(this, now, delta) + val duration = source.onPull(this, now, delta) val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration // Reset update flags @@ -205,7 +194,7 @@ internal class SimResourceContextImpl( // Check whether the state has changed after [consumer.onNext] when (_state) { State.Active -> { - logic.onConsume(this, now, delta, _limit, duration) + logic.onPush(this, now, delta, _limit) // Schedule an update at the new deadline scheduleUpdate(now, newDeadline) @@ -261,7 +250,7 @@ internal class SimResourceContextImpl( try { if (_state == State.Active) { - consumer.onEvent(this, SimResourceEvent.Run) + source.onEvent(this, timestamp, FlowEvent.Converge) } logic.onConverge(this, timestamp, delta) @@ -270,14 +259,14 @@ internal class SimResourceContextImpl( } } - override fun toString(): String = "SimResourceContextImpl[capacity=$capacity,rate=$_rate]" + override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" /** * Stop the resource context. */ private fun doStop(now: Long, delta: Long) { try { - consumer.onEvent(this, SimResourceEvent.Exit) + source.onEvent(this, now, FlowEvent.Exit) logic.onFinish(this, now, delta) } catch (cause: Throwable) { doFail(now, delta, cause) @@ -292,7 +281,7 @@ internal class SimResourceContextImpl( */ private fun doFail(now: Long, delta: Long, cause: Throwable) { try { - consumer.onFailure(this, cause) + source.onFailure(this, cause) } catch (e: Throwable) { e.addSuppressed(cause) e.printStackTrace() @@ -310,11 +299,11 @@ internal class SimResourceContextImpl( return } - interpreter.batch { + engine.batch { // Inform the consumer of the capacity change. This might already trigger an interrupt. - consumer.onEvent(this, SimResourceEvent.Capacity) + source.onEvent(this, _clock.millis(), FlowEvent.Capacity) - interrupt() + pull() } } @@ -322,7 +311,7 @@ internal class SimResourceContextImpl( * Schedule an update for this resource context. */ private fun scheduleUpdate(now: Long) { - interpreter.scheduleImmediate(now, this) + engine.scheduleImmediate(now, this) } /** @@ -331,7 +320,7 @@ internal class SimResourceContextImpl( private fun scheduleUpdate(now: Long, target: Long) { val timers = _timers if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { - timers.addFirst(interpreter.scheduleDelayed(now, this, target)) + timers.addFirst(engine.scheduleDelayed(now, this, target)) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt index 01062179..141d335d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -20,14 +20,14 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.impl +package org.opendc.simulator.flow.internal -import org.opendc.simulator.resources.SimResourceCounters +import org.opendc.simulator.flow.FlowCounters /** - * Mutable implementation of the [SimResourceCounters] interface. + * Mutable implementation of the [FlowCounters] interface. */ -internal class SimResourceCountersImpl : SimResourceCounters { +internal class FlowCountersImpl : FlowCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 override var overcommit: Double = 0.0 @@ -41,6 +41,6 @@ internal class SimResourceCountersImpl : SimResourceCounters { } override fun toString(): String { - return "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 2abf0749..1a50da2c 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -20,25 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.impl +package org.opendc.simulator.flow.internal import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* import java.time.Clock import java.util.* import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext /** - * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after. + * Internal implementation of the [FlowEngine] interface. * * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter { +internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -46,24 +46,24 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } /** - * The queue of resource updates that are scheduled for immediate execution. + * The queue of connection updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque<SimResourceContextImpl>() + private val queue = ArrayDeque<FlowConsumerContextImpl>() /** - * A priority queue containing the resource updates to be scheduled in the future. + * A priority queue containing the connection updates to be scheduled in the future. */ private val futureQueue = PriorityQueue<Timer>() /** - * The stack of interpreter invocations to occur in the future. + * The stack of engine invocations to occur in the future. */ private val futureInvocations = ArrayDeque<Invocation>() /** - * The systems that have been visited during the interpreter cycle. + * The systems that have been visited during the engine cycle. */ - private val visited = linkedSetOf<SimResourceContextImpl>() + private val visited = linkedSetOf<FlowConsumerContextImpl>() /** * The index in the batch stack. @@ -71,7 +71,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, private var batchIndex = 0 /** - * A flag to indicate that the interpreter is currently active. + * A flag to indicate that the engine is currently active. */ private val isRunning: Boolean get() = batchIndex > 0 @@ -79,57 +79,57 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * Update the specified [ctx] synchronously. */ - fun scheduleSync(now: Long, ctx: SimResourceContextImpl) { + fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { ctx.doUpdate(now) visited.add(ctx) - // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked - // up by the active interpreter. + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. if (isRunning) { return } try { batchIndex++ - runInterpreter(now) + runEngine(now) } finally { batchIndex-- } } /** - * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle. + * Enqueue the specified [ctx] to be updated immediately during the active engine cycle. * - * This method should be used when the state of a resource context is invalidated/interrupted and needs to be - * re-computed. In case no interpreter is currently active, the interpreter will be started. + * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. In case no engine is currently active, the engine will be started. */ - fun scheduleImmediate(now: Long, ctx: SimResourceContextImpl) { + fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) { queue.add(ctx) - // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked - // up by the active interpreter. + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. if (isRunning) { return } try { batchIndex++ - runInterpreter(now) + runEngine(now) } finally { batchIndex-- } } /** - * Schedule the interpreter to run at [target] to update the resource contexts. + * Schedule the engine to run at [target] to update the flow contexts. * * This method will override earlier calls to this method for the same [ctx]. * * @param now The current virtual timestamp. - * @param ctx The resource context to which the event applies. + * @param ctx The flow context to which the event applies. * @param target The timestamp when the interrupt should happen. */ - fun scheduleDelayed(now: Long, ctx: SimResourceContextImpl, target: Long): Timer { + fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer { val futureQueue = futureQueue require(target >= now) { "Timestamp must be in the future" } @@ -140,7 +140,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, return timer } - override fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext = SimResourceContextImpl(this, consumer, provider) + override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) override fun pushBatch() { batchIndex++ @@ -150,7 +150,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, try { // Flush the work if the platform is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - runInterpreter(clock.millis()) + runEngine(clock.millis()) } } finally { batchIndex-- @@ -158,9 +158,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * Interpret all actions that are scheduled for the current timestamp. + * Run all the enqueued actions for the specified [timestamp][now]. */ - private fun runInterpreter(now: Long) { + private fun runEngine(now: Long) { val queue = queue val futureQueue = futureQueue val futureInvocations = futureInvocations @@ -219,7 +219,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, visited.clear() } while (queue.isNotEmpty()) - // Schedule an interpreter invocation for the next update to occur. + // Schedule an engine invocation for the next update to occur. val headTimer = futureQueue.peek() if (headTimer != null) { trySchedule(now, futureInvocations, headTimer.target) @@ -227,10 +227,10 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * Try to schedule an interpreter invocation at the specified [target]. + * Try to schedule an engine invocation at the specified [target]. * * @param now The current virtual timestamp. - * @param target The virtual timestamp at which the interpreter invocation should happen. + * @param target The virtual timestamp at which the engine invocation should happen. * @param scheduled The queue of scheduled invocations. */ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { @@ -245,7 +245,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, { try { batchIndex++ - runInterpreter(target) + runEngine(target) } finally { batchIndex-- } @@ -267,9 +267,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * A future interpreter invocation. + * A future engine invocation. * - * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case + * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case * the invocation is not needed anymore, it can be cancelled via [cancel]. */ private data class Invocation( @@ -277,7 +277,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, @JvmField val handle: DisposableHandle ) { /** - * Cancel the interpreter invocation. + * Cancel the engine invocation. */ fun cancel() = handle.dispose() } @@ -285,10 +285,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * An update call for [ctx] that is scheduled for [target]. * - * This class represents an update in the future at [target] requested by [ctx]. A deferred update might be - * cancelled if the resource context was invalidated in the meantime. + * This class represents an update in the future at [target] requested by [ctx]. */ - class Timer(@JvmField val ctx: SimResourceContextImpl, @JvmField val target: Long) : Comparable<Timer> { + class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> { override fun compareTo(other: Timer): Int { return target.compareTo(other.target) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 3c25b76d..17b82391 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -20,45 +20,48 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.mux -import org.opendc.simulator.resources.interference.InterferenceKey +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowCounters +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.interference.InterferenceKey /** - * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers. + * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. */ -public interface SimResourceSwitch { +public interface FlowMultiplexer { /** - * The output resource providers to which resource consumers can be attached. + * The inputs of the multiplexer that can be used to consume sources. */ - public val outputs: Set<SimResourceProvider> + public val inputs: Set<FlowConsumer> /** - * The input resources that will be switched between the output providers. + * The outputs of the multiplexer over which the flows will be distributed. */ - public val inputs: Set<SimResourceProvider> + public val outputs: Set<FlowConsumer> /** - * The resource counters to track the execution metrics of all switch resources. + * The flow counters to track the flow metrics of all multiplexer inputs. */ - public val counters: SimResourceCounters + public val counters: FlowCounters /** - * Create a new output on the switch. + * Create a new input on this multiplexer. * - * @param key The key of the interference member to which the output belongs. + * @param key The key of the interference member to which the input belongs. */ - public fun newOutput(key: InterferenceKey? = null): SimResourceProvider + public fun newInput(key: InterferenceKey? = null): FlowConsumer /** - * Remove [output] from this switch. + * Remove [input] from this multiplexer. */ - public fun removeOutput(output: SimResourceProvider) + public fun removeInput(input: FlowConsumer) /** - * Add the specified [input] to the switch. + * Add the specified [output] to the multiplexer. */ - public fun addInput(input: SimResourceProvider) + public fun addOutput(output: FlowConsumer) /** * Clear all inputs and outputs from the switch. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt new file mode 100644 index 00000000..811d9460 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey +import java.util.ArrayDeque + +/** + * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means + * that a single input is directly connected to an output and that the multiplexer can only support as many + * inputs as outputs. + * + * @param engine The [FlowEngine] driving the simulation. + */ +public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer { + override val inputs: Set<FlowConsumer> + get() = _inputs + private val _inputs = mutableSetOf<Input>() + + override val outputs: Set<FlowConsumer> + get() = _outputs + private val _outputs = mutableSetOf<FlowConsumer>() + private val _availableOutputs = ArrayDeque<FlowForwarder>() + + override val counters: FlowCounters = object : FlowCounters { + override val demand: Double + get() = _outputs.sumOf { it.counters.demand } + override val actual: Double + get() = _outputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _outputs.sumOf { it.counters.overcommit } + override val interference: Double + get() = _outputs.sumOf { it.counters.interference } + + override fun reset() { + for (input in _outputs) { + input.counters.reset() + } + } + + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val output = Input(forwarder) + _inputs += output + return output + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + + (input as Input).close() + } + + override fun addOutput(output: FlowConsumer) { + if (output in outputs) { + return + } + + val forwarder = FlowForwarder(engine) + + _outputs += output + _availableOutputs += forwarder + + output.startConsumer(object : FlowSource by forwarder { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) { + // De-register the output after it has finished + _outputs -= output + } + + forwarder.onEvent(conn, now, event) + } + }) + } + + override fun clear() { + for (input in _outputs) { + input.cancel() + } + _outputs.clear() + + // Inputs are implicitly cancelled by the output forwarders + _inputs.clear() + } + + /** + * An input on the multiplexer. + */ + private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { + /** + * Close the input. + */ + fun close() { + // We explicitly do not close the forwarder here in order to re-use it across input resources. + _inputs -= this + _availableOutputs += forwarder + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt new file mode 100644 index 00000000..9735f121 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -0,0 +1,399 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceDomain +import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.flow.internal.FlowCountersImpl +import kotlin.math.max +import kotlin.math.min + +/** + * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing. + * + * @param engine The [FlowEngine] to drive the flow simulation. + * @param parent The parent flow system of the multiplexer. + * @param interferenceDomain The interference domain of the multiplexer. + */ +public class MaxMinFlowMultiplexer( + private val engine: FlowEngine, + private val parent: FlowSystem? = null, + private val interferenceDomain: InterferenceDomain? = null +) : FlowMultiplexer { + /** + * The inputs of the multiplexer. + */ + override val inputs: Set<FlowConsumer> + get() = _inputs + private val _inputs = mutableSetOf<Input>() + private val _activeInputs = mutableListOf<Input>() + + /** + * The outputs of the multiplexer. + */ + override val outputs: Set<FlowConsumer> + get() = _outputs + private val _outputs = mutableSetOf<FlowConsumer>() + private val _activeOutputs = mutableListOf<Output>() + + /** + * The flow counters of this multiplexer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + /** + * The actual processing rate of the multiplexer. + */ + private var _rate = 0.0 + + /** + * The demanded processing rate of the input. + */ + private var _demand = 0.0 + + /** + * The capacity of the outputs. + */ + private var _capacity = 0.0 + + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val provider = Input(_capacity, key) + _inputs.add(provider) + return provider + } + + override fun addOutput(output: FlowConsumer) { + val consumer = Output(output) + if (_outputs.add(output)) { + _activeOutputs.add(consumer) + output.startConsumer(consumer) + } + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + // This cast should always succeed since only `Input` instances should be added to `_inputs` + (input as Input).close() + } + + override fun clear() { + for (input in _activeOutputs) { + input.cancel() + } + _activeOutputs.clear() + + for (output in _activeInputs) { + output.cancel() + } + _activeInputs.clear() + } + + /** + * Converge the scheduler of the multiplexer. + */ + private fun runScheduler(now: Long) { + if (_schedulerActive) { + return + } + + _schedulerActive = true + try { + doSchedule(now) + } finally { + _schedulerActive = false + } + } + + /** + * Schedule the inputs over the outputs. + */ + private fun doSchedule(now: Long) { + val activeInputs = _activeInputs + val activeOutputs = _activeOutputs + + // If there is no work yet, mark the inputs as idle. + if (activeInputs.isEmpty()) { + return + } + + val capacity = _capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val inputIterator = activeInputs.listIterator() + for (input in inputIterator) { + input.pull(now) + + // Remove outputs that have finished + if (!input.isActive) { + inputIterator.remove() + } + } + + var demand = 0.0 + + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + var remaining = activeInputs.size + for (input in activeInputs) { + val availableShare = availableCapacity / remaining-- + val grantedRate = min(input.allowedRate, availableShare) + + // Ignore empty sources + if (grantedRate <= 0.0) { + input.actualRate = 0.0 + continue + } + + input.actualRate = grantedRate + demand += input.limit + availableCapacity -= grantedRate + } + + val rate = capacity - availableCapacity + + _demand = demand + _rate = rate + + // Sort all consumers by their capacity + activeOutputs.sort() + + // Divide the requests over the available capacity of the input resources fairly + for (output in activeOutputs) { + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } + } + + /** + * Recompute the capacity of the multiplexer. + */ + private fun updateCapacity() { + val newCapacity = _activeOutputs.sumOf(Output::capacity) + + // No-op if the capacity is unchanged + if (_capacity == newCapacity) { + return + } + + _capacity = newCapacity + + for (input in _inputs) { + input.capacity = newCapacity + } + } + + /** + * An internal [FlowConsumer] implementation for multiplexer inputs. + */ + private inner class Input(capacity: Double, val key: InterferenceKey?) : + AbstractFlowConsumer(engine, capacity), + FlowConsumerLogic, + Comparable<Input> { + /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + val allowedRate: Double + get() = min(capacity, limit) + + /** + * A flag to indicate that the input is closed. + */ + private var _isClosed: Boolean = false + + /** + * The timestamp at which we received the last command. + */ + private var _lastPull: Long = Long.MIN_VALUE + + /** + * Close the input. + * + * This method is invoked when the user removes an input from the switch. + */ + fun close() { + _isClosed = true + cancel() + } + + /* AbstractFlowConsumer */ + override fun createLogic(): FlowConsumerLogic = this + + override fun start(ctx: FlowConsumerContext) { + check(!_isClosed) { "Cannot re-use closed input" } + + _activeInputs += this + super.start(ctx) + } + + /* FlowConsumerLogic */ + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + doUpdateCounters(delta) + + actualRate = 0.0 + this.limit = rate + _lastPull = now + + runScheduler(now) + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + doUpdateCounters(delta) + + limit = 0.0 + actualRate = 0.0 + _lastPull = now + } + + /* Comparable */ + override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) + + /** + * Pull the source if necessary. + */ + fun pull(now: Long) { + val ctx = ctx + if (ctx != null && _lastPull < now) { + ctx.flush() + } + } + + /** + * Helper method to update the flow counters of the multiplexer. + */ + private fun doUpdateCounters(delta: Long) { + if (delta <= 0L) { + return + } + + // Compute the performance penalty due to flow interference + val perfScore = if (interferenceDomain != null) { + val load = _rate / capacity + interferenceDomain.apply(key, load) + } else { + 1.0 + } + + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = actualRate * deltaS + val remainingWork = work - actualWork + + updateCounters(work, actualWork, remainingWork) + + val distCounters = _counters + distCounters.demand += work + distCounters.actual += actualWork + distCounters.overcommit += remainingWork + distCounters.interference += actualWork * max(0.0, 1 - perfScore) + } + } + + /** + * An internal [FlowSource] implementation for multiplexer outputs. + */ + private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> { + /** + * The active [FlowConnection] of this source. + */ + private var _ctx: FlowConnection? = null + + /** + * The capacity of this output. + */ + val capacity: Double + get() = _ctx?.capacity ?: 0.0 + + /** + * Push the specified rate to the consumer. + */ + fun push(rate: Double) { + _ctx?.push(rate) + } + + /** + * Cancel this output. + */ + fun cancel() { + provider.cancel() + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + runScheduler(now) + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + assert(_ctx == null) { "Source running concurrently" } + _ctx = conn + updateCapacity() + } + FlowEvent.Exit -> { + _ctx = null + updateCapacity() + } + FlowEvent.Capacity -> updateCapacity() + else -> {} + } + } + + override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt index bf76711f..d9779c6a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -20,41 +20,37 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource import kotlin.math.roundToLong /** - * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. + * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization]. */ -public class SimWorkConsumer( - private val work: Double, - private val utilization: Double -) : SimResourceConsumer { +public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource { init { - require(work >= 0.0) { "Work must be positive" } + require(amount >= 0.0) { "Amount must be positive" } require(utilization > 0.0) { "Utilization must be positive" } } - private var remainingWork = work + private var remainingAmount = amount - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val actualWork = ctx.speed * delta / 1000.0 - val limit = ctx.capacity * utilization + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumed = conn.rate * delta / 1000.0 + val limit = conn.capacity * utilization - remainingWork -= actualWork + remainingAmount -= consumed - val remainingWork = remainingWork - val duration = (remainingWork / limit * 1000).roundToLong() + val duration = (remainingAmount / limit * 1000).roundToLong() return if (duration > 0) { - ctx.push(limit) + conn.push(limit) duration } else { - ctx.close() + conn.close() Long.MAX_VALUE } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt index 52a42241..b3191ad3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt @@ -20,13 +20,13 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source /** - * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to - * complete, before proceeding its operation. + * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to + * finish a pull, before proceeding its operation. */ -public class SimConsumerBarrier(public val parties: Int) { +public class FlowSourceBarrier(public val parties: Int) { private var counter = 0 /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 46885640..7fcc0405 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -20,20 +20,20 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource import kotlin.math.min /** * Helper class to expose an observable [speed] field describing the speed of the consumer. */ -public class SimSpeedConsumerAdapter( - private val delegate: SimResourceConsumer, +public class FlowSourceRateAdapter( + private val delegate: FlowSource, private val callback: (Double) -> Unit = {} -) : SimResourceConsumer by delegate { +) : FlowSource by delegate { /** * The resource processing speed at this instant. */ @@ -49,34 +49,34 @@ public class SimSpeedConsumerAdapter( callback(0.0) } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return delegate.onNext(ctx, now, delta) + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { val oldSpeed = speed - delegate.onEvent(ctx, event) + delegate.onEvent(conn, now, event) when (event) { - SimResourceEvent.Run -> speed = ctx.speed - SimResourceEvent.Capacity -> { + FlowEvent.Converge -> speed = conn.rate + FlowEvent.Capacity -> { // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might // need to update the current speed. if (oldSpeed == speed) { - speed = min(ctx.capacity, speed) + speed = min(conn.capacity, speed) } } - SimResourceEvent.Exit -> speed = 0.0 + FlowEvent.Exit -> speed = 0.0 else -> {} } } - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + override fun onFailure(conn: FlowConnection, cause: Throwable) { speed = 0.0 - delegate.onFailure(ctx, cause) + delegate.onFailure(conn, cause) } - override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" + override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index 4c0e075c..4d3ae61a 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -20,21 +20,20 @@ * SOFTWARE. */ -package org.opendc.simulator.resources.consumer +package org.opendc.simulator.flow.source -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceContext -import org.opendc.simulator.resources.SimResourceEvent +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource /** - * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource - * consumption for some period of time. + * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time. */ -public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { +public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource { private var _iterator: Iterator<Fragment>? = null private var _nextTarget = Long.MIN_VALUE - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget if (nextTarget > now) { @@ -45,21 +44,21 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour return if (iterator.hasNext()) { val fragment = iterator.next() _nextTarget = now + fragment.duration - ctx.push(fragment.usage) + conn.push(fragment.usage) fragment.duration } else { - ctx.close() + conn.close() Long.MAX_VALUE } } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> { - check(_iterator == null) { "Consumer already running" } + FlowEvent.Start -> { + check(_iterator == null) { "Source already running" } _iterator = trace.iterator() } - SimResourceEvent.Exit -> { + FlowEvent.Exit -> { _iterator = null } else -> {} @@ -67,7 +66,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour } /** - * A fragment of the workload. + * A fragment of the tgrace. */ public data class Fragment(val duration: Long, val usage: Double) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt new file mode 100644 index 00000000..061ebea6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.* +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowConsumerContextImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource + +/** + * A test suite for the [FlowConsumerContextImpl] class. + */ +class FlowConsumerContextTest { + @Test + fun testFlushWithoutCommand() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + engine.scheduleSync(engine.clock.millis(), context) + } + + @Test + fun testIntermediateFlush() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = FixedFlowSource(1.0, 1.0) + + val logic = spyk(object : FlowConsumerLogic {}) + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 1.0 + + context.start() + delay(1) // Delay 1 ms to prevent hitting the fast path + engine.scheduleSync(engine.clock.millis(), context) + + verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) } + } + + @Test + fun testDoubleStart() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(0.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + assertThrows<IllegalStateException> { + context.start() + } + } + + @Test + fun testIdempotentCapacityChange() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + }) + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 4200.0 + context.start() + context.capacity = 4200.0 + + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testFailureNoInfiniteLoop() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) throw IllegalStateException("onEvent") + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + throw IllegalStateException("onFailure") + } + }) + + val logic = object : FlowConsumerLogic {} + + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + delay(1) + + verify(exactly = 1) { consumer.onFailure(any(), any()) } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 49e60f68..cbc48a4e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import io.mockk.spyk import io.mockk.verify @@ -29,24 +29,24 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource /** - * A test suite for the [SimResourceForwarder] class. + * A test suite for the [FlowForwarder] class. */ -internal class SimResourceForwarderTest { +internal class FlowForwarderTest { @Test fun testCancelImmediately() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) launch { source.consume(forwarder) } - forwarder.consume(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } }) @@ -57,22 +57,22 @@ internal class SimResourceForwarderTest { @Test fun testCancel() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) launch { source.consume(forwarder) } - forwarder.consume(object : SimResourceConsumer { + forwarder.consume(object : FlowSource { var isFirst = true - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) 10 * 1000 } else { - ctx.close() + conn.close() Long.MAX_VALUE } } @@ -84,10 +84,11 @@ internal class SimResourceForwarderTest { @Test fun testState() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } } @@ -108,11 +109,12 @@ internal class SimResourceForwarderTest { @Test fun testCancelPendingDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } }) @@ -120,16 +122,16 @@ internal class SimResourceForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testCancelStartedDelegate() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) - val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -137,17 +139,17 @@ internal class SimResourceForwarderTest { yield() forwarder.cancel() - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testCancelPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) - val consumer = spyk(SimWorkConsumer(2000.0, 1.0)) + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) source.startConsumer(forwarder) yield() @@ -155,19 +157,19 @@ internal class SimResourceForwarderTest { yield() source.cancel() - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testExitPropagation() = runBlockingSimulation { - val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(2000.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } } @@ -181,11 +183,11 @@ internal class SimResourceForwarderTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(1.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + val consumer = spyk(FixedFlowSource(2.0, 1.0)) source.startConsumer(forwarder) coroutineScope { @@ -195,16 +197,16 @@ internal class SimResourceForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } } @Test fun testCounters() = runBlockingSimulation { - val forwarder = SimResourceForwarder() - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val source = SimResourceSource(1.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) - val consumer = SimWorkConsumer(2.0, 1.0) + val consumer = FixedFlowSource(2.0, 1.0) source.startConsumer(forwarder) forwarder.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index e055daf7..010a985e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow import io.mockk.every import io.mockk.mockk @@ -30,24 +30,24 @@ import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter /** - * A test suite for the [SimResourceSource] class. + * A test suite for the [FlowSink] class. */ -internal class SimResourceSourceTest { +internal class FlowSinkTest { @Test fun testSpeed() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(4200.0, 1.0) + val consumer = FixedFlowSource(4200.0, 1.0) val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val adapter = FlowSourceRateAdapter(consumer, res::add) provider.consume(adapter) @@ -56,10 +56,10 @@ internal class SimResourceSourceTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val provider = SimResourceSource(1.0, scheduler) + val engine = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(engine, 1.0) - val consumer = spyk(SimWorkConsumer(2.0, 1.0)) + val consumer = spyk(FixedFlowSource(2.0, 1.0)) coroutineScope { launch { provider.consume(consumer) } @@ -67,19 +67,19 @@ internal class SimResourceSourceTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } } @Test fun testSpeedLimit() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(capacity, 2.0) + val consumer = FixedFlowSource(capacity, 2.0) val res = mutableListOf<Double>() - val adapter = SimSpeedConsumerAdapter(consumer, res::add) + val adapter = FlowSourceRateAdapter(consumer, res::add) provider.consume(adapter) @@ -87,23 +87,23 @@ internal class SimResourceSourceTest { } /** - * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or - * [SimResourceConsumer.onNext]. + * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or + * [FlowSource.onPull]. */ @Test fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() return Long.MAX_VALUE } - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - ctx.interrupt() + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + conn.pull() } } @@ -112,28 +112,28 @@ internal class SimResourceSourceTest { @Test fun testInterrupt() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) - lateinit var resCtx: SimResourceContext + val provider = FlowSink(engine, capacity) + lateinit var resCtx: FlowConnection - val consumer = object : SimResourceConsumer { + val consumer = object : FlowSource { var isFirst = true - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> resCtx = ctx + FlowEvent.Start -> resCtx = conn else -> {} } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) 4000 } else { - ctx.close() + conn.close() Long.MAX_VALUE } } @@ -141,7 +141,7 @@ internal class SimResourceSourceTest { launch { yield() - resCtx.interrupt() + resCtx.pull() } provider.consume(consumer) @@ -150,12 +150,12 @@ internal class SimResourceSourceTest { @Test fun testFailure() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } + val consumer = mockk<FlowSource>(relaxUnitFun = true) + every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) } .throws(IllegalStateException()) assertThrows<IllegalStateException> { @@ -165,17 +165,17 @@ internal class SimResourceSourceTest { @Test fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = object : SimResourceConsumer { + val consumer = object : FlowSource { var isFirst = true - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) 1000 } else { throw IllegalStateException() @@ -190,11 +190,11 @@ internal class SimResourceSourceTest { @Test fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(capacity, 1.0) + val consumer = FixedFlowSource(capacity, 1.0) assertThrows<IllegalStateException> { coroutineScope { @@ -206,11 +206,11 @@ internal class SimResourceSourceTest { @Test fun testCancelDuringConsumption() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = SimWorkConsumer(capacity, 1.0) + val consumer = FixedFlowSource(capacity, 1.0) launch { provider.consume(consumer) } delay(500) @@ -225,12 +225,12 @@ internal class SimResourceSourceTest { fun testInfiniteSleep() { assertThrows<IllegalStateException> { runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, scheduler) + val provider = FlowSink(engine, capacity) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE } provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt index 49f2da5f..b503087e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.mux import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals @@ -28,13 +28,14 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource /** - * Test suite for the [SimResourceSwitchExclusive] class. + * Test suite for the [ForwardingFlowMultiplexer] class. */ internal class SimResourceSwitchExclusiveTest { /** @@ -42,29 +43,29 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTrace() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val speed = mutableListOf<Double>() val duration = 5 * 60L val workload = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) ), ) - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) - val forwarder = SimResourceForwarder() - val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) source.startConsumer(adapter) - switch.addInput(forwarder) + switch.addOutput(forwarder) - val provider = switch.newOutput() + val provider = switch.newInput() provider.consume(workload) yield() @@ -79,17 +80,17 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = SimWorkConsumer(duration * 3.2, 1.0) + val workload = FixedFlowSource(duration * 3.2, 1.0) - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) - switch.addInput(source) + switch.addOutput(source) - val provider = switch.newOutput() + val provider = switch.newInput() provider.consume(workload) yield() @@ -101,37 +102,37 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 - val workload = object : SimResourceConsumer { + val workload = object : FlowSource { var isFirst = true - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - SimResourceEvent.Start -> isFirst = true + FlowEvent.Start -> isFirst = true else -> {} } } - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return if (isFirst) { isFirst = false - ctx.push(1.0) + conn.push(1.0) duration } else { - ctx.close() + conn.close() Long.MAX_VALUE } } } - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) - switch.addInput(source) + switch.addOutput(source) - val provider = switch.newOutput() + val provider = switch.newInput() provider.consume(workload) yield() provider.consume(workload) @@ -143,14 +144,14 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val engine = FlowEngineImpl(coroutineContext, clock) - val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, scheduler) + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) - switch.addInput(source) + switch.addOutput(source) - switch.newOutput() - assertThrows<IllegalStateException> { switch.newOutput() } + switch.newInput() + assertThrows<IllegalStateException> { switch.newInput() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt index 03f90e21..089a8d78 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.mux import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -28,24 +28,26 @@ import kotlinx.coroutines.yield import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.TraceFlowSource /** - * Test suite for the [SimResourceSwitch] implementations + * Test suite for the [FlowMultiplexer] implementations */ internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin(scheduler) + val scheduler = FlowEngineImpl(coroutineContext, clock) + val switch = MaxMinFlowMultiplexer(scheduler) - val sources = List(2) { SimResourceSource(2000.0, scheduler) } - sources.forEach { switch.addInput(it) } + val sources = List(2) { FlowSink(scheduler, 2000.0) } + sources.forEach { switch.addOutput(it) } - val provider = switch.newOutput() - val consumer = SimWorkConsumer(2000.0, 1.0) + val provider = switch.newInput() + val consumer = FixedFlowSource(2000.0, 1.0) try { provider.consume(consumer) @@ -60,24 +62,24 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val scheduler = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L val workload = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) ), ) - val switch = SimResourceSwitchMaxMin(scheduler) - val provider = switch.newOutput() + val switch = MaxMinFlowMultiplexer(scheduler) + val provider = switch.newInput() try { - switch.addInput(SimResourceSource(3200.0, scheduler)) + switch.addOutput(FlowSink(scheduler, 3200.0)) provider.consume(workload) yield() } finally { @@ -97,34 +99,34 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val scheduler = FlowEngineImpl(coroutineContext, clock) val duration = 5 * 60L val workloadA = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3500.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 183.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) ), ) val workloadB = - SimTraceConsumer( + TraceFlowSource( sequenceOf( - SimTraceConsumer.Fragment(duration * 1000, 28.0), - SimTraceConsumer.Fragment(duration * 1000, 3100.0), - SimTraceConsumer.Fragment(duration * 1000, 0.0), - SimTraceConsumer.Fragment(duration * 1000, 73.0) + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3100.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 73.0) ) ) - val switch = SimResourceSwitchMaxMin(scheduler) - val providerA = switch.newOutput() - val providerB = switch.newOutput() + val switch = MaxMinFlowMultiplexer(scheduler) + val providerA = switch.newInput() + val providerB = switch.newInput() try { - switch.addInput(SimResourceSource(3200.0, scheduler)) + switch.addOutput(FlowSink(scheduler, 3200.0)) coroutineScope { launch { providerA.consume(workloadA) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt index 830f16d3..8396d346 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt @@ -20,24 +20,25 @@ * SOFTWARE. */ -package org.opendc.simulator.resources +package org.opendc.simulator.flow.source import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl /** - * A test suite for the [SimWorkConsumer] class. + * A test suite for the [FixedFlowSource] class. */ -internal class SimWorkConsumerTest { +internal class FixedFlowSourceTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val provider = SimResourceSource(1.0, scheduler) + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) - val consumer = SimWorkConsumer(1.0, 1.0) + val consumer = FixedFlowSource(1.0, 1.0) provider.consume(consumer) assertEquals(1000, clock.millis()) @@ -45,10 +46,10 @@ internal class SimWorkConsumerTest { @Test fun testUtilization() = runBlockingSimulation { - val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) - val provider = SimResourceSource(1.0, scheduler) + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) - val consumer = SimWorkConsumer(1.0, 0.5) + val consumer = FixedFlowSource(1.0, 0.5) provider.consume(consumer) assertEquals(2000, clock.millis()) diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts index eb9adcd1..a8f94602 100644 --- a/opendc-simulator/opendc-simulator-network/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts @@ -30,6 +30,6 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcSimulator.opendcSimulatorResources) + api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt index 102e5625..4b66d5cf 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowSource /** * A network port allows network devices to be connected to network through links. @@ -78,14 +78,14 @@ public abstract class SimNetworkPort { } /** - * Create a [SimResourceConsumer] which generates the outgoing traffic of this port. + * Create a [FlowSource] which generates the outgoing traffic of this port. */ - protected abstract fun createConsumer(): SimResourceConsumer + protected abstract fun createConsumer(): FlowSource /** - * The [SimResourceProvider] which processes the ingoing traffic of this port. + * The [FlowConsumer] which processes the ingoing traffic of this port. */ - protected abstract val provider: SimResourceProvider + protected abstract val provider: FlowConsumer override fun toString(): String = "SimNetworkPort[isConnected=$isConnected]" } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index 7db0f176..4b0d7bbd 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -22,22 +22,22 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* /** * A network sink which discards all received traffic and does not generate any traffic itself. */ public class SimNetworkSink( - interpreter: SimResourceInterpreter, + engine: FlowEngine, public val capacity: Double ) : SimNetworkPort() { - override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun createConsumer(): FlowSource = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE override fun toString(): String = "SimNetworkSink.Consumer" } - override val provider: SimResourceProvider = SimResourceSource(capacity, interpreter) + override val provider: FlowConsumer = FlowSink(engine, capacity) override fun toString(): String = "SimNetworkSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt index 2267715e..2b7c1ad7 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt @@ -22,12 +22,13 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A [SimNetworkSwitch] that can support new networking ports on demand. */ -public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimNetworkSwitch { +public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetworkSwitch { /** * The ports of this switch. */ @@ -36,9 +37,9 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN private val _ports = mutableListOf<Port>() /** - * The [SimResourceSwitchMaxMin] to actually perform the switching. + * The [MaxMinFlowMultiplexer] to actually perform the switching. */ - private val switch = SimResourceSwitchMaxMin(interpreter) + private val mux = MaxMinFlowMultiplexer(engine) /** * Open a new port on the switch. @@ -58,19 +59,19 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN */ private var isClosed: Boolean = false - override val provider: SimResourceProvider + override val provider: FlowConsumer get() = _provider - private val _provider = switch.newOutput() + private val _provider = mux.newInput() - override fun createConsumer(): SimResourceConsumer { - val forwarder = SimResourceForwarder(isCoupled = true) - switch.addInput(forwarder) + override fun createConsumer(): FlowSource { + val forwarder = FlowForwarder(engine, isCoupled = true) + mux.addOutput(forwarder) return forwarder } override fun close() { isClosed = true - switch.removeOutput(_provider) + mux.removeInput(_provider) _ports.remove(this) } } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index b8c4b00d..45d0bcf0 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -31,8 +31,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimNetworkSink] class. @@ -40,8 +40,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimNetworkSinkTest { @Test fun testInitialState() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertFalse(sink.isConnected) assertNull(sink.link) @@ -50,8 +50,8 @@ class SimNetworkSinkTest { @Test fun testDisconnectIdempotent() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertDoesNotThrow { sink.disconnect() } assertFalse(sink.isConnected) @@ -59,8 +59,8 @@ class SimNetworkSinkTest { @Test fun testConnectCircular() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertThrows<IllegalArgumentException> { sink.connect(sink) @@ -69,8 +69,8 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnectedTarget() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) val source = mockk<SimNetworkPort>(relaxUnitFun = true) every { source.isConnected } returns true @@ -81,9 +81,9 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnected() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source1 = Source(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source1 = Source(engine) val source2 = mockk<SimNetworkPort>(relaxUnitFun = true) @@ -97,9 +97,9 @@ class SimNetworkSinkTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) val consumer = source.consumer sink.connect(source) @@ -108,14 +108,14 @@ class SimNetworkSinkTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), SimResourceEvent.Start) } + verify { consumer.onEvent(any(), any(), FlowEvent.Start) } } @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) val consumer = source.consumer sink.connect(source) @@ -124,14 +124,14 @@ class SimNetworkSinkTest { assertFalse(sink.isConnected) assertFalse(source.isConnected) - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } - private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() { - val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8)) + private class Source(engine: FlowEngine) : SimNetworkPort() { + val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8)) - public override fun createConsumer(): SimResourceConsumer = consumer + public override fun createConsumer(): FlowSource = consumer - override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter) + override val provider: FlowConsumer = FlowSink(engine, 0.0) } } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt index 3a749bfe..4aa2fa92 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt @@ -28,8 +28,8 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimNetworkSwitchVirtual] class. @@ -37,10 +37,10 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimNetworkSwitchVirtualTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) - val switch = SimNetworkSwitchVirtual(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) + val switch = SimNetworkSwitchVirtual(engine) val consumer = source.consumer switch.newPort().connect(sink) @@ -50,14 +50,14 @@ class SimNetworkSwitchVirtualTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), SimResourceEvent.Start) } + verify { consumer.onEvent(any(), any(), FlowEvent.Start) } } @Test fun testConnectClosedPort() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val switch = SimNetworkSwitchVirtual(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val switch = SimNetworkSwitchVirtual(engine) val port = switch.newPort() port.close() @@ -67,11 +67,11 @@ class SimNetworkSwitchVirtualTest { } } - private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() { - val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8)) + private class Source(engine: FlowEngine) : SimNetworkPort() { + val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8)) - public override fun createConsumer(): SimResourceConsumer = consumer + public override fun createConsumer(): FlowSource = consumer - override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter) + override val provider: FlowConsumer = FlowSink(engine, 0.0) } } diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts index f2a49964..e4342a6a 100644 --- a/opendc-simulator/opendc-simulator-power/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts @@ -30,6 +30,6 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcSimulator.opendcSimulatorResources) + api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt index 1a12a52a..c33f5186 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt @@ -22,46 +22,48 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.FlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A model of a Power Distribution Unit (PDU). * - * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood. + * @param engine The underlying [FlowEngine] to drive the simulation under the hood. * @param idlePower The idle power consumption of the PDU independent of the load on the PDU. * @param lossCoefficient The coefficient for the power loss of the PDU proportional to the square load. */ public class SimPdu( - interpreter: SimResourceInterpreter, + engine: FlowEngine, private val idlePower: Double = 0.0, private val lossCoefficient: Double = 0.0, ) : SimPowerInlet() { /** - * The [SimResourceSwitch] that distributes the electricity over the PDU outlets. + * The [FlowMultiplexer] that distributes the electricity over the PDU outlets. */ - private val switch = SimResourceSwitchMaxMin(interpreter) + private val mux = MaxMinFlowMultiplexer(engine) /** - * The [SimResourceForwarder] that represents the input of the PDU. + * The [FlowForwarder] that represents the input of the PDU. */ - private val forwarder = SimResourceForwarder() + private val forwarder = FlowForwarder(engine) /** * Create a new PDU outlet. */ - public fun newOutlet(): Outlet = Outlet(switch, switch.newOutput()) + public fun newOutlet(): Outlet = Outlet(mux, mux.newInput()) init { - switch.addInput(forwarder) + mux.addOutput(forwarder) } - override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by forwarder { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val duration = forwarder.onNext(ctx, now, delta) - val loss = computePowerLoss(ctx.demand) - val newLimit = ctx.demand + loss + override fun createConsumer(): FlowSource = object : FlowSource by forwarder { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val duration = forwarder.onPull(conn, now, delta) + val loss = computePowerLoss(conn.demand) + val newLimit = conn.demand + loss - ctx.push(newLimit) + conn.push(newLimit) return duration } @@ -81,7 +83,7 @@ public class SimPdu( /** * A PDU outlet. */ - public class Outlet(private val switch: SimResourceSwitch, private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable { + public class Outlet(private val switch: FlowMultiplexer, private val provider: FlowConsumer) : SimPowerOutlet(), AutoCloseable { override fun onConnect(inlet: SimPowerInlet) { provider.startConsumer(inlet.createConsumer()) } @@ -94,7 +96,7 @@ public class SimPdu( * Remove the outlet from the PDU. */ override fun close() { - switch.removeOutput(provider) + switch.removeInput(provider) } override fun toString(): String = "SimPdu.Outlet" diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt index 0ac1f199..851b28a5 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.SimResourceConsumer +import org.opendc.simulator.flow.FlowSource /** * An abstract inlet that consumes electricity from a power outlet. @@ -42,7 +42,7 @@ public abstract class SimPowerInlet { internal var _outlet: SimPowerOutlet? = null /** - * Create a [SimResourceConsumer] which represents the consumption of electricity from the power outlet. + * Create a [FlowSource] which represents the consumption of electricity from the power outlet. */ - public abstract fun createConsumer(): SimResourceConsumer + public abstract fun createConsumer(): FlowSource } diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt index 3ef8ccc6..7faebd75 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt @@ -22,25 +22,25 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.SimResourceSource +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowSink /** * A [SimPowerOutlet] that represents a source of electricity. * - * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood. + * @param engine The underlying [FlowEngine] to drive the simulation under the hood. */ -public class SimPowerSource(interpreter: SimResourceInterpreter, public val capacity: Double) : SimPowerOutlet() { +public class SimPowerSource(engine: FlowEngine, public val capacity: Double) : SimPowerOutlet() { /** * The resource source that drives this power source. */ - private val source = SimResourceSource(capacity, interpreter) + private val source = FlowSink(engine, capacity) /** * The power draw at this instant. */ public val powerDraw: Double - get() = source.speed + get() = source.rate override fun onConnect(inlet: SimPowerInlet) { source.startConsumer(inlet.createConsumer()) diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt index 9c7400ed..5eaa91af 100644 --- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt +++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt @@ -22,50 +22,51 @@ package org.opendc.simulator.power -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A model of an Uninterruptible Power Supply (UPS). * * This model aggregates multiple power sources into a single source in order to ensure that power is always available. * - * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood. + * @param engine The underlying [FlowEngine] to drive the simulation under the hood. * @param idlePower The idle power consumption of the UPS independent of the load. * @param lossCoefficient The coefficient for the power loss of the UPS proportional to the load. */ public class SimUps( - interpreter: SimResourceInterpreter, + private val engine: FlowEngine, private val idlePower: Double = 0.0, private val lossCoefficient: Double = 0.0, ) : SimPowerOutlet() { /** * The resource aggregator used to combine the input sources. */ - private val switch = SimResourceSwitchMaxMin(interpreter) + private val switch = MaxMinFlowMultiplexer(engine) /** - * The [SimResourceProvider] that represents the output of the UPS. + * The [FlowConsumer] that represents the output of the UPS. */ - private val provider = switch.newOutput() + private val provider = switch.newInput() /** * Create a new UPS outlet. */ public fun newInlet(): SimPowerInlet { - val forward = SimResourceForwarder(isCoupled = true) - switch.addInput(forward) + val forward = FlowForwarder(engine, isCoupled = true) + switch.addOutput(forward) return Inlet(forward) } override fun onConnect(inlet: SimPowerInlet) { val consumer = inlet.createConsumer() - provider.startConsumer(object : SimResourceConsumer by consumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val duration = consumer.onNext(ctx, now, delta) - val loss = computePowerLoss(ctx.demand) - val newLimit = ctx.demand + loss + provider.startConsumer(object : FlowSource by consumer { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val duration = consumer.onPull(conn, now, delta) + val loss = computePowerLoss(conn.demand) + val newLimit = conn.demand + loss - ctx.push(newLimit) + conn.push(newLimit) return duration } }) @@ -86,8 +87,8 @@ public class SimUps( /** * A UPS inlet. */ - public inner class Inlet(private val forwarder: SimResourceForwarder) : SimPowerInlet(), AutoCloseable { - override fun createConsumer(): SimResourceConsumer = forwarder + public inner class Inlet(private val forwarder: FlowForwarder) : SimPowerInlet(), AutoCloseable { + override fun createConsumer(): FlowSource = forwarder /** * Remove the inlet from the PSU. diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt index 17a174b7..568a1e8c 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt @@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceEvent -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimPdu] class. @@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimPduTest { @Test fun testZeroOutlets() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) assertEquals(0.0, source.powerDraw) @@ -49,9 +49,9 @@ internal class SimPduTest { @Test fun testSingleOutlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) pdu.newOutlet().connect(SimpleInlet()) @@ -60,9 +60,9 @@ internal class SimPduTest { @Test fun testDoubleOutlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) pdu.newOutlet().connect(SimpleInlet()) @@ -73,28 +73,28 @@ internal class SimPduTest { @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) - val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0)) + val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0)) val inlet = object : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = consumer + override fun createConsumer(): FlowSource = consumer } val outlet = pdu.newOutlet() outlet.connect(inlet) outlet.disconnect() - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testLoss() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) // https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN - val pdu = SimPdu(interpreter, idlePower = 1.5, lossCoefficient = 0.015) + val pdu = SimPdu(engine, idlePower = 1.5, lossCoefficient = 0.015) source.connect(pdu) pdu.newOutlet().connect(SimpleInlet()) assertEquals(89.0, source.powerDraw, 0.01) @@ -102,9 +102,9 @@ internal class SimPduTest { @Test fun testOutletClose() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val pdu = SimPdu(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val pdu = SimPdu(engine) source.connect(pdu) val outlet = pdu.newOutlet() outlet.close() @@ -115,6 +115,6 @@ internal class SimPduTest { } class SimpleInlet : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5) + override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5) } } diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt index f3829ba1..b411e292 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt @@ -31,10 +31,10 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceEvent -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimPowerSource] @@ -42,8 +42,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimPowerSourceTest { @Test fun testInitialState() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) assertFalse(source.isConnected) assertNull(source.inlet) @@ -52,8 +52,8 @@ internal class SimPowerSourceTest { @Test fun testDisconnectIdempotent() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) assertDoesNotThrow { source.disconnect() } assertFalse(source.isConnected) @@ -61,8 +61,8 @@ internal class SimPowerSourceTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = SimpleInlet() source.connect(inlet) @@ -76,27 +76,27 @@ internal class SimPowerSourceTest { @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0)) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0)) val inlet = object : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = consumer + override fun createConsumer(): FlowSource = consumer } source.connect(inlet) source.disconnect() - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } @Test fun testDisconnectAssertion() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = mockk<SimPowerInlet>(relaxUnitFun = true) every { inlet.isConnected } returns false every { inlet._outlet } returns null - every { inlet.createConsumer() } returns SimWorkConsumer(100.0, utilization = 1.0) + every { inlet.createConsumer() } returns FixedFlowSource(100.0, utilization = 1.0) source.connect(inlet) @@ -107,8 +107,8 @@ internal class SimPowerSourceTest { @Test fun testOutletAlreadyConnected() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = SimpleInlet() source.connect(inlet) @@ -121,8 +121,8 @@ internal class SimPowerSourceTest { @Test fun testInletAlreadyConnected() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) val inlet = mockk<SimPowerInlet>(relaxUnitFun = true) every { inlet.isConnected } returns true @@ -132,6 +132,6 @@ internal class SimPowerSourceTest { } class SimpleInlet : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 1.0) + override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 1.0) } } diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt index 8d5fa857..31ac0b39 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt @@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceEvent -import org.opendc.simulator.resources.SimResourceInterpreter -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.FlowEngine +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimUps] class. @@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimUpsTest { @Test fun testSingleInlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) - val ups = SimUps(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) + val ups = SimUps(engine) source.connect(ups.newInlet()) ups.connect(SimpleInlet()) @@ -50,10 +50,10 @@ internal class SimUpsTest { @Test fun testDoubleInlet() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source1 = SimPowerSource(interpreter, capacity = 100.0) - val source2 = SimPowerSource(interpreter, capacity = 100.0) - val ups = SimUps(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source1 = SimPowerSource(engine, capacity = 100.0) + val source2 = SimPowerSource(engine, capacity = 100.0) + val ups = SimUps(engine) source1.connect(ups.newInlet()) source2.connect(ups.newInlet()) @@ -67,10 +67,10 @@ internal class SimUpsTest { @Test fun testLoss() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source = SimPowerSource(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val source = SimPowerSource(engine, capacity = 100.0) // https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN - val ups = SimUps(interpreter, idlePower = 4.0, lossCoefficient = 0.05) + val ups = SimUps(engine, idlePower = 4.0, lossCoefficient = 0.05) source.connect(ups.newInlet()) ups.connect(SimpleInlet()) @@ -79,24 +79,24 @@ internal class SimUpsTest { @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val source1 = SimPowerSource(interpreter, capacity = 100.0) - val source2 = SimPowerSource(interpreter, capacity = 100.0) - val ups = SimUps(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val source1 = SimPowerSource(engine, capacity = 100.0) + val source2 = SimPowerSource(engine, capacity = 100.0) + val ups = SimUps(engine) source1.connect(ups.newInlet()) source2.connect(ups.newInlet()) - val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0)) + val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0)) val inlet = object : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = consumer + override fun createConsumer(): FlowSource = consumer } ups.connect(inlet) ups.disconnect() - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } } class SimpleInlet : SimPowerInlet() { - override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5) + override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt deleted file mode 100644 index b406b896..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -/** - * A controllable [SimResourceContext]. - * - * This interface is used by resource providers to control the resource context. - */ -public interface SimResourceControllableContext : SimResourceContext { - /** - * The capacity of the resource. - */ - public override var capacity: Double - - /** - * Start the resource context. - */ - public fun start() - - /** - * Invalidate the resource context's state. - * - * By invalidating the resource context's current state, the state is re-computed and the current progress is - * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the - * synchronous variant. - */ - public fun invalidate() - - /** - * Synchronously flush the progress of the resource context and materialize its current progress. - */ - public fun flush() -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt deleted file mode 100644 index f1e004d2..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.interference.InterferenceKey -import java.util.ArrayDeque - -/** - * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that - * a single output is directly connected to an input and that the switch can only support as many outputs as inputs. - */ -public class SimResourceSwitchExclusive : SimResourceSwitch { - override val outputs: Set<SimResourceProvider> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - - private val _inputs = mutableSetOf<SimResourceProvider>() - override val inputs: Set<SimResourceProvider> - get() = _inputs - private val _availableInputs = ArrayDeque<SimResourceForwarder>() - - override val counters: SimResourceCounters = object : SimResourceCounters { - override val demand: Double - get() = _inputs.sumOf { it.counters.demand } - override val actual: Double - get() = _inputs.sumOf { it.counters.actual } - override val overcommit: Double - get() = _inputs.sumOf { it.counters.overcommit } - override val interference: Double - get() = _inputs.sumOf { it.counters.interference } - - override fun reset() { - for (input in _inputs) { - input.counters.reset() - } - } - - override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" - } - - /** - * Add an output to the switch. - */ - override fun newOutput(key: InterferenceKey?): SimResourceProvider { - val forwarder = checkNotNull(_availableInputs.poll()) { "No capacity to serve request" } - val output = Output(forwarder) - _outputs += output - return output - } - - override fun removeOutput(output: SimResourceProvider) { - if (!_outputs.remove(output)) { - return - } - - (output as Output).close() - } - - /** - * Add an input to the switch. - */ - override fun addInput(input: SimResourceProvider) { - if (input in inputs) { - return - } - - val forwarder = SimResourceForwarder() - - _inputs += input - _availableInputs += forwarder - - input.startConsumer(object : SimResourceConsumer by forwarder { - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - if (event == SimResourceEvent.Exit) { - // De-register the input after it has finished - _inputs -= input - } - - forwarder.onEvent(ctx, event) - } - }) - } - - override fun clear() { - for (input in _inputs) { - input.cancel() - } - _inputs.clear() - - // Outputs are implicitly cancelled by the inputs forwarders - _outputs.clear() - } - - /** - * An output of the resource switch. - */ - private inner class Output(private val forwarder: SimResourceForwarder) : SimResourceProvider by forwarder { - /** - * Close the output. - */ - fun close() { - // We explicitly do not close the forwarder here in order to re-use it across output resources. - _outputs -= this - _availableInputs += forwarder - } - - override fun toString(): String = "SimResourceSwitchExclusive.Output" - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt deleted file mode 100644 index 574fb443..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.impl.SimResourceCountersImpl -import org.opendc.simulator.resources.interference.InterferenceDomain -import org.opendc.simulator.resources.interference.InterferenceKey -import kotlin.math.max -import kotlin.math.min - -/** - * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min - * fair sharing. - * - * @param interpreter The interpreter for managing the resource contexts. - * @param parent The parent resource system of the switch. - * @param interferenceDomain The interference domain of the switch. - */ -public class SimResourceSwitchMaxMin( - private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem? = null, - private val interferenceDomain: InterferenceDomain? = null -) : SimResourceSwitch { - /** - * The output resource providers to which resource consumers can be attached. - */ - override val outputs: Set<SimResourceProvider> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - private val _activeOutputs: MutableList<Output> = mutableListOf() - - /** - * The input resources that will be switched between the output providers. - */ - override val inputs: Set<SimResourceProvider> - get() = _inputs - private val _inputs = mutableSetOf<SimResourceProvider>() - private val _activeInputs = mutableListOf<Input>() - - /** - * The resource counters of this switch. - */ - public override val counters: SimResourceCounters - get() = _counters - private val _counters = SimResourceCountersImpl() - - /** - * The actual processing rate of the switch. - */ - private var _rate = 0.0 - - /** - * The demanded processing rate of the outputs. - */ - private var _demand = 0.0 - - /** - * The capacity of the switch. - */ - private var _capacity = 0.0 - - /** - * Flag to indicate that the scheduler is active. - */ - private var _schedulerActive = false - - /** - * Add an output to the switch. - */ - override fun newOutput(key: InterferenceKey?): SimResourceProvider { - val provider = Output(_capacity, key) - _outputs.add(provider) - return provider - } - - /** - * Add the specified [input] to the switch. - */ - override fun addInput(input: SimResourceProvider) { - val consumer = Input(input) - if (_inputs.add(input)) { - _activeInputs.add(consumer) - input.startConsumer(consumer) - } - } - - /** - * Remove [output] from this switch. - */ - override fun removeOutput(output: SimResourceProvider) { - if (!_outputs.remove(output)) { - return - } - // This cast should always succeed since only `Output` instances should be added to _outputs - (output as Output).close() - } - - override fun clear() { - for (input in _activeInputs) { - input.cancel() - } - _activeInputs.clear() - - for (output in _activeOutputs) { - output.cancel() - } - _activeOutputs.clear() - } - - /** - * Run the scheduler of the switch. - */ - private fun runScheduler(now: Long) { - if (_schedulerActive) { - return - } - - _schedulerActive = true - try { - doSchedule(now) - } finally { - _schedulerActive = false - } - } - - /** - * Schedule the outputs over the input. - */ - private fun doSchedule(now: Long) { - // If there is no work yet, mark the input as idle. - if (_activeOutputs.isEmpty()) { - return - } - - val capacity = _capacity - var availableCapacity = capacity - - // Pull in the work of the outputs - val outputIterator = _activeOutputs.listIterator() - for (output in outputIterator) { - output.pull(now) - - // Remove outputs that have finished - if (!output.isActive) { - outputIterator.remove() - } - } - - var demand = 0.0 - - // Sort in-place the outputs based on their requested usage. - // Profiling shows that it is faster than maintaining some kind of sorted set. - _activeOutputs.sort() - - // Divide the available input capacity fairly across the outputs using max-min fair sharing - var remaining = _activeOutputs.size - for (output in _activeOutputs) { - val availableShare = availableCapacity / remaining-- - val grantedSpeed = min(output.allowedRate, availableShare) - - // Ignore idle computation - if (grantedSpeed <= 0.0) { - output.actualRate = 0.0 - continue - } - - demand += output.limit - - output.actualRate = grantedSpeed - availableCapacity -= grantedSpeed - } - - val rate = capacity - availableCapacity - - _demand = demand - _rate = rate - - // Sort all consumers by their capacity - _activeInputs.sort() - - // Divide the requests over the available capacity of the input resources fairly - for (input in _activeInputs) { - val inputCapacity = input.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction - - input.push(grantedSpeed) - } - } - - /** - * Recompute the capacity of the switch. - */ - private fun updateCapacity() { - val newCapacity = _activeInputs.sumOf(Input::capacity) - - // No-op if the capacity is unchanged - if (_capacity == newCapacity) { - return - } - - _capacity = newCapacity - - for (output in _outputs) { - output.capacity = newCapacity - } - } - - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class Output(capacity: Double, val key: InterferenceKey?) : - SimAbstractResourceProvider(interpreter, capacity), - SimResourceProviderLogic, - Comparable<Output> { - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - - /** - * The actual processing speed. - */ - @JvmField var actualRate: Double = 0.0 - - /** - * The processing speed that is allowed by the model constraints. - */ - val allowedRate: Double - get() = min(capacity, limit) - - /** - * A flag to indicate that the output is closed. - */ - private var _isClosed: Boolean = false - - /** - * The timestamp at which we received the last command. - */ - private var _lastPull: Long = Long.MIN_VALUE - - /** - * Close the output. - * - * This method is invoked when the user removes an output from the switch. - */ - fun close() { - _isClosed = true - cancel() - } - - /* SimAbstractResourceProvider */ - override fun createLogic(): SimResourceProviderLogic = this - - override fun start(ctx: SimResourceControllableContext) { - check(!_isClosed) { "Cannot re-use closed output" } - - _activeOutputs += this - super.start(ctx) - } - - /* SimResourceProviderLogic */ - override fun onConsume( - ctx: SimResourceControllableContext, - now: Long, - delta: Long, - limit: Double, - duration: Long - ) { - doUpdateCounters(delta) - - actualRate = 0.0 - this.limit = limit - _lastPull = now - - runScheduler(now) - } - - override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { - parent?.onConverge(now) - } - - override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { - doUpdateCounters(delta) - - limit = 0.0 - actualRate = 0.0 - _lastPull = now - } - - /* Comparable */ - override fun compareTo(other: Output): Int = allowedRate.compareTo(other.allowedRate) - - /** - * Pull the next command if necessary. - */ - fun pull(now: Long) { - val ctx = ctx - if (ctx != null && _lastPull < now) { - ctx.flush() - } - } - - /** - * Helper method to update the resource counters of the distributor. - */ - private fun doUpdateCounters(delta: Long) { - if (delta <= 0L) { - return - } - - // Compute the performance penalty due to resource interference - val perfScore = if (interferenceDomain != null) { - val load = _rate / capacity - interferenceDomain.apply(key, load) - } else { - 1.0 - } - - val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualRate * deltaS - val remainingWork = work - actualWork - - updateCounters(work, actualWork, remainingWork) - - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) - } - } - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private inner class Input(private val provider: SimResourceProvider) : SimResourceConsumer, Comparable<Input> { - /** - * The active [SimResourceContext] of this consumer. - */ - private var _ctx: SimResourceContext? = null - - /** - * The capacity of this input. - */ - val capacity: Double - get() = _ctx?.capacity ?: 0.0 - - /** - * Push the specified rate to the provider. - */ - fun push(rate: Double) { - _ctx?.push(rate) - } - - /** - * Cancel this input. - */ - fun cancel() { - provider.cancel() - } - - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - runScheduler(now) - return Long.MAX_VALUE - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - assert(_ctx == null) { "Consumer running concurrently" } - _ctx = ctx - updateCapacity() - } - SimResourceEvent.Exit -> { - _ctx = null - updateCapacity() - } - SimResourceEvent.Capacity -> updateCapacity() - else -> {} - } - } - - override fun compareTo(other: Input): Int = capacity.compareTo(other.capacity) - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt deleted file mode 100644 index 1428ce42..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import io.mockk.* -import kotlinx.coroutines.* -import org.junit.jupiter.api.* -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.simulator.resources.impl.SimResourceContextImpl -import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl - -/** - * A test suite for the [SimResourceContextImpl] class. - */ -class SimResourceContextTest { - @Test - fun testFlushWithoutCommand() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(1.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - } - - val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(interpreter, consumer, logic) - - interpreter.scheduleSync(interpreter.clock.millis(), context) - } - - @Test - fun testIntermediateFlush() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = SimWorkConsumer(1.0, 1.0) - - val logic = spyk(object : SimResourceProviderLogic {}) - val context = SimResourceContextImpl(interpreter, consumer, logic) - context.capacity = 1.0 - - context.start() - delay(1) // Delay 1 ms to prevent hitting the fast path - interpreter.scheduleSync(interpreter.clock.millis(), context) - - verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } - } - - @Test - fun testIntermediateFlushIdle() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = SimWorkConsumer(1.0, 1.0) - - val logic = spyk(object : SimResourceProviderLogic {}) - val context = SimResourceContextImpl(interpreter, consumer, logic) - context.capacity = 1.0 - - context.start() - delay(500) - context.invalidate() - delay(500) - context.invalidate() - - assertAll( - { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } }, - { verify(exactly = 1) { logic.onFinish(any(), any(), any()) } } - ) - } - - @Test - fun testDoubleStart() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(0.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - } - - val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(interpreter, consumer, logic) - - context.start() - - assertThrows<IllegalStateException> { - context.start() - } - } - - @Test - fun testIdempotentCapacityChange() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(1.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - }) - - val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(interpreter, consumer, logic) - context.capacity = 4200.0 - context.start() - context.capacity = 4200.0 - - verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) } - } - - @Test - fun testFailureNoInfiniteLoop() = runBlockingSimulation { - val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - - val consumer = spyk(object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - ctx.close() - return Long.MAX_VALUE - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - if (event == SimResourceEvent.Exit) throw IllegalStateException("onEvent") - } - - override fun onFailure(ctx: SimResourceContext, cause: Throwable) { - throw IllegalStateException("onFailure") - } - }) - - val logic = object : SimResourceProviderLogic {} - - val context = SimResourceContextImpl(interpreter, consumer, logic) - - context.start() - - delay(1) - - verify(exactly = 1) { consumer.onFailure(any(), any()) } - } -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 96b300d7..59308e11 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -123,7 +123,7 @@ class RunnerCli : CliktCommand(name = "runner") { .default(60L * 3) // Experiment may run for a maximum of three minutes /** - * Run a single scenario. + * Converge a single scenario. */ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> { val id = scenario.id @@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") { } /** - * Run a single repeat. + * Converge a single repeat. */ private suspend fun runRepeat( scenario: Scenario, @@ -199,7 +199,7 @@ class RunnerCli : CliktCommand(name = "runner") { try { // Instantiate the topology onto the simulator simulator.apply(topology) - // Run workload trace + // Converge workload trace simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { simulator.close() diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 992b4991..04f54e58 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -42,7 +42,7 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.SimResourceInterpreter +import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.sdk.toOtelClock import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -70,7 +70,7 @@ internal class WorkflowServiceTest { .setClock(clock.toOtelClock()) .build() - val interpreter = SimResourceInterpreter(coroutineContext, clock) + val interpreter = FlowEngine(coroutineContext, clock) val machineModel = createMachineModel() val hvProvider = SimSpaceSharedHypervisorProvider() val hosts = List(4) { id -> diff --git a/settings.gradle.kts b/settings.gradle.kts index 587f1cb2..d9b3d940 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -38,7 +38,7 @@ include(":opendc-web:opendc-web-api") include(":opendc-web:opendc-web-ui") include(":opendc-web:opendc-web-runner") include(":opendc-simulator:opendc-simulator-core") -include(":opendc-simulator:opendc-simulator-resources") +include(":opendc-simulator:opendc-simulator-flow") include(":opendc-simulator:opendc-simulator-power") include(":opendc-simulator:opendc-simulator-network") include(":opendc-simulator:opendc-simulator-compute") |
