summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-29 23:56:16 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:39 +0200
commit4cc1d40d421c8736f8b21b360b61d6b065158b7a (patch)
treecb2de79a72881eb0b2dee6a82dd498faba5dd26d /opendc-simulator
parentdd605ab1f70fef1fbbed848e8ebbd6b231622273 (diff)
refactor(simulator): Migrate to flow-based simulation
This change renames the `opendc-simulator-resources` module into the `opendc-simulator-flow` module to indicate that the core simulation model of OpenDC is based around modelling and simulating flows. Previously, the distinction between resource consumer and provider, and input and output caused some confusion. By switching to a flow-based model, this distinction is now clear (as in, the water flows from source to consumer/sink).
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt20
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt49
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt18
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt24
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt30
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt26
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt48
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt36
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts (renamed from opendc-simulator/opendc-simulator-resources/build.gradle.kts)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt (renamed from opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt)78
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt)63
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt)30
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt)55
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt45
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt)24
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt)16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt)25
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt)16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt)97
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt)34
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt)31
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt)6
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt)8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt)79
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt)10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt)77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt)37
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt127
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt399
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt)32
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt)8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt)36
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt)29
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt152
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt)110
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt)112
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt)83
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt)72
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt)23
-rw-r--r--opendc-simulator/opendc-simulator-network/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt12
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt10
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt21
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt50
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt28
-rw-r--r--opendc-simulator/opendc-simulator-power/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt36
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt6
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt12
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt33
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt52
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt46
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt44
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt54
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt129
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt407
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt173
82 files changed, 1711 insertions, 1765 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index 7d06ee62..e2290a14 100644
--- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -31,7 +31,7 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
- api(projects.opendcSimulator.opendcSimulatorResources)
+ api(projects.opendcSimulator.opendcSimulatorFlow)
api(projects.opendcSimulator.opendcSimulatorPower)
api(projects.opendcSimulator.opendcSimulatorNetwork)
implementation(projects.opendcSimulator.opendcSimulatorCore)
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 88ad7286..c57919c1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -36,7 +36,7 @@ import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
@@ -48,13 +48,13 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var interpreter: SimResourceInterpreter
+ private lateinit var engine: FlowEngine
private lateinit var machineModel: MachineModel
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
+ engine = FlowEngine(scope.coroutineContext, scope.clock)
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
@@ -80,7 +80,7 @@ class SimMachineBenchmarks {
fun benchmarkBareMetal(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace))
}
@@ -90,9 +90,9 @@ class SimMachineBenchmarks {
fun benchmarkSpaceSharedHypervisor(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
@@ -111,9 +111,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorSingle(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(interpreter)
+ val hypervisor = SimFairShareHypervisor(engine)
launch { machine.run(hypervisor) }
@@ -132,9 +132,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorDouble(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(interpreter)
+ val hypervisor = SimFairShareHypervisor(engine)
launch { machine.run(hypervisor) }
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index f9db048d..6a62d8a5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -30,22 +30,22 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.NetworkAdapter
import org.opendc.simulator.compute.model.StorageDevice
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
*
- * @param interpreter The interpreter to manage the machine's resources.
+ * @param engine The engine to manage the machine's resources.
* @param parent The parent simulation system.
* @param model The model of the machine.
*/
public abstract class SimAbstractMachine(
- protected val interpreter: SimResourceInterpreter,
- final override val parent: SimResourceSystem?,
+ protected val engine: FlowEngine,
+ final override val parent: FlowSystem?,
final override val model: MachineModel
-) : SimMachine, SimResourceSystem {
+) : SimMachine, FlowSystem {
/**
* The resources allocated for this machine.
*/
@@ -54,17 +54,17 @@ public abstract class SimAbstractMachine(
/**
* The memory interface of the machine.
*/
- public val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory)
+ public val memory: SimMemory = Memory(FlowSink(engine, model.memory.sumOf { it.size }.toDouble()), model.memory)
/**
* The network interfaces available to the machine.
*/
- public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) }
+ public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(engine, adapter, i) }
/**
* The network interfaces available to the machine.
*/
- public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) }
+ public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(engine, device, i) }
/**
* The peripherals of the machine.
@@ -82,7 +82,7 @@ public abstract class SimAbstractMachine(
private var cont: Continuation<Unit>? = null
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
check(!isTerminated) { "Machine is terminated" }
@@ -96,14 +96,14 @@ public abstract class SimAbstractMachine(
// Cancel all cpus on cancellation
cont.invokeOnCancellation {
this.cont = null
- interpreter.batch {
+ engine.batch {
for (cpu in cpus) {
cpu.cancel()
}
}
}
- interpreter.batch { workload.onStart(ctx) }
+ engine.batch { workload.onStart(ctx) }
}
}
@@ -120,7 +120,7 @@ public abstract class SimAbstractMachine(
* Cancel the workload that is currently running on the machine.
*/
private fun cancel() {
- interpreter.batch {
+ engine.batch {
for (cpu in cpus) {
cpu.cancel()
}
@@ -137,8 +137,8 @@ public abstract class SimAbstractMachine(
* The execution context in which the workload runs.
*/
private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val interpreter: SimResourceInterpreter
- get() = this@SimAbstractMachine.interpreter
+ override val engine: FlowEngine
+ get() = this@SimAbstractMachine.engine
override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
@@ -154,7 +154,7 @@ public abstract class SimAbstractMachine(
/**
* The [SimMemory] implementation for a machine.
*/
- private class Memory(source: SimResourceSource, override val models: List<MemoryUnit>) : SimMemory, SimResourceProvider by source {
+ private class Memory(source: FlowSink, override val models: List<MemoryUnit>) : SimMemory, FlowConsumer by source {
override fun toString(): String = "SimAbstractMachine.Memory"
}
@@ -162,6 +162,7 @@ public abstract class SimAbstractMachine(
* The [SimNetworkAdapter] implementation for a machine.
*/
private class NetworkAdapterImpl(
+ private val engine: FlowEngine,
model: NetworkAdapter,
index: Int
) : SimNetworkAdapter(), SimNetworkInterface {
@@ -169,18 +170,18 @@ public abstract class SimAbstractMachine(
override val bandwidth: Double = model.bandwidth
- override val provider: SimResourceProvider
+ override val provider: FlowConsumer
get() = _rx
- override fun createConsumer(): SimResourceConsumer = _tx
+ override fun createConsumer(): FlowSource = _tx
- override val tx: SimResourceProvider
+ override val tx: FlowConsumer
get() = _tx
- private val _tx = SimResourceForwarder()
+ private val _tx = FlowForwarder(engine)
- override val rx: SimResourceConsumer
+ override val rx: FlowSource
get() = _rx
- private val _rx = SimResourceForwarder()
+ private val _rx = FlowForwarder(engine)
override fun toString(): String = "SimAbstractMachine.NetworkAdapterImpl[name=$name,bandwidth=$bandwidth]"
}
@@ -189,7 +190,7 @@ public abstract class SimAbstractMachine(
* The [SimStorageInterface] implementation for a machine.
*/
private class StorageDeviceImpl(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
model: StorageDevice,
index: Int
) : SimStorageInterface {
@@ -197,9 +198,9 @@ public abstract class SimAbstractMachine(
override val capacity: Double = model.capacity
- override val read: SimResourceProvider = SimResourceSource(model.readBandwidth, interpreter)
+ override val read: FlowConsumer = FlowSink(engine, model.readBandwidth)
- override val write: SimResourceProvider = SimResourceSource(model.writeBandwidth, interpreter)
+ override val write: FlowConsumer = FlowSink(engine, model.writeBandwidth)
override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 639ca450..37cf282b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -26,8 +26,8 @@ import org.opendc.simulator.compute.device.SimPsu
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerDriver
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.FlowEngine
/**
* A simulated bare-metal machine that is able to run a single workload.
@@ -35,19 +35,19 @@ import org.opendc.simulator.resources.SimResourceInterpreter
* A [SimBareMetalMachine] is a stateful object, and you should be careful when operating this object concurrently. For
* example, the class expects only a single concurrent call to [run].
*
- * @param interpreter The [SimResourceInterpreter] to drive the simulation.
+ * @param engine The [FlowEngine] to drive the simulation.
* @param model The machine model to simulate.
* @param powerDriver The power driver to use.
* @param psu The power supply of the machine.
* @param parent The parent simulation system.
*/
public class SimBareMetalMachine(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
model: MachineModel,
powerDriver: PowerDriver,
public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)),
- parent: SimResourceSystem? = null,
-) : SimAbstractMachine(interpreter, parent, model) {
+ parent: FlowSystem? = null,
+) : SimAbstractMachine(engine, parent, model) {
/**
* The power draw of the machine onto the PSU.
*/
@@ -58,7 +58,7 @@ public class SimBareMetalMachine(
* The processing units of the machine.
*/
override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu ->
- Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu)
+ Cpu(FlowSink(engine, cpu.frequency, this@SimBareMetalMachine), cpu)
}
/**
@@ -78,9 +78,9 @@ public class SimBareMetalMachine(
* A [SimProcessingUnit] of a bare-metal machine.
*/
private class Cpu(
- private val source: SimResourceSource,
+ private val source: FlowSink,
override val model: ProcessingUnit
- ) : SimProcessingUnit, SimResourceProvider by source {
+ ) : SimProcessingUnit, FlowConsumer by source {
override var capacity: Double
get() = source.capacity
set(value) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index d8dd8205..ab0b56ae 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -41,7 +41,7 @@ public interface SimMachine : AutoCloseable {
public val peripherals: List<SimPeripheral>
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap())
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index 6996a30d..1317f728 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* A simulated execution context in which a bootable image runs. This interface represents the
@@ -31,9 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter
*/
public interface SimMachineContext : AutoCloseable {
/**
- * The resource interpreter that simulates the machine.
+ * The [FlowEngine] that simulates the machine.
*/
- public val interpreter: SimResourceInterpreter
+ public val engine: FlowEngine
/**
* The metadata associated with the context.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt
index 6623df23..b1aef495 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt
@@ -23,12 +23,12 @@
package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
/**
* An interface to control the memory usage of simulated workloads.
*/
-public interface SimMemory : SimResourceProvider {
+public interface SimMemory : FlowConsumer {
/**
* The models representing the static information of the memory units supporting this interface.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt
index 1ac126ae..660b2871 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowSource
/**
* A firmware interface to a network adapter.
@@ -42,10 +42,10 @@ public interface SimNetworkInterface {
/**
* The resource provider for the transmit channel of the network interface.
*/
- public val tx: SimResourceProvider
+ public val tx: FlowConsumer
/**
* The resource consumer for the receive channel of the network interface.
*/
- public val rx: SimResourceConsumer
+ public val rx: FlowSource
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
index 93c9ddfa..c9f36ece 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
@@ -23,12 +23,12 @@
package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
/**
* A simulated processing unit.
*/
-public interface SimProcessingUnit : SimResourceProvider {
+public interface SimProcessingUnit : FlowConsumer {
/**
* The capacity of the processing unit, which can be adjusted by the workload if supported by the machine.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt
index 21a801f1..3d648671 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
/**
* A firmware interface to a storage device.
@@ -41,10 +41,10 @@ public interface SimStorageInterface {
/**
* The resource provider for the read operations of the storage device.
*/
- public val read: SimResourceProvider
+ public val read: FlowConsumer
/**
* The resource consumer for the write operation of the storage device.
*/
- public val write: SimResourceProvider
+ public val write: FlowConsumer
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index 6e6e590f..b05d8ad9 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -23,10 +23,10 @@
package org.opendc.simulator.compute.device
import org.opendc.simulator.compute.power.PowerDriver
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
import org.opendc.simulator.power.SimPowerInlet
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
import java.util.*
/**
@@ -54,7 +54,7 @@ public class SimPsu(
/**
* The consumer context.
*/
- private var _ctx: SimResourceContext? = null
+ private var _ctx: FlowConnection? = null
/**
* The driver that is connected to the PSU.
@@ -69,7 +69,7 @@ public class SimPsu(
* Update the power draw of the PSU.
*/
public fun update() {
- _ctx?.interrupt()
+ _ctx?.pull()
}
/**
@@ -81,18 +81,18 @@ public class SimPsu(
update()
}
- override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun createConsumer(): FlowSource = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0)
- ctx.push(powerDraw)
+ conn.push(powerDraw)
return Long.MAX_VALUE
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> _ctx = ctx
- SimResourceEvent.Run -> _powerDraw = ctx.speed
- SimResourceEvent.Exit -> _ctx = null
+ FlowEvent.Start -> _ctx = conn
+ FlowEvent.Converge -> _powerDraw = conn.rate
+ FlowEvent.Exit -> _ctx = null
else -> {}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index cf9e3230..b145eefc 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -28,17 +28,17 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.SimResourceSwitch
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.FlowMultiplexer
/**
* Abstract implementation of the [SimHypervisor] interface.
*
- * @param interpreter The resource interpreter to use.
+ * @param engine The [FlowEngine] to drive the simulation.
* @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware.
*/
public abstract class SimAbstractHypervisor(
- private val interpreter: SimResourceInterpreter,
+ protected val engine: FlowEngine,
private val scalingGovernor: ScalingGovernor? = null,
protected val interferenceDomain: VmInterferenceDomain? = null
) : SimHypervisor {
@@ -50,7 +50,7 @@ public abstract class SimAbstractHypervisor(
/**
* The resource switch to use.
*/
- private lateinit var switch: SimResourceSwitch
+ private lateinit var mux: FlowMultiplexer
/**
* The virtual machines running on this hypervisor.
@@ -62,8 +62,8 @@ public abstract class SimAbstractHypervisor(
/**
* The resource counters associated with the hypervisor.
*/
- public override val counters: SimResourceCounters
- get() = switch.counters
+ public override val counters: FlowCounters
+ get() = mux.counters
/**
* The scaling governors attached to the physical CPUs backing this hypervisor.
@@ -71,14 +71,14 @@ public abstract class SimAbstractHypervisor(
private val governors = mutableListOf<ScalingGovernor.Logic>()
/**
- * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs.
+ * Construct the [FlowMultiplexer] implementation that performs the actual scheduling of the CPUs.
*/
- public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch
+ public abstract fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer
/**
* Check whether the specified machine model fits on this hypervisor.
*/
- public abstract fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean
+ public abstract fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean
/**
* Trigger the governors to recompute the scaling limits.
@@ -91,7 +91,7 @@ public abstract class SimAbstractHypervisor(
/* SimHypervisor */
override fun canFit(model: MachineModel): Boolean {
- return canFit(model, switch)
+ return canFit(model, mux)
}
override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine {
@@ -104,7 +104,7 @@ public abstract class SimAbstractHypervisor(
/* SimWorkload */
override fun onStart(ctx: SimMachineContext) {
context = ctx
- switch = createSwitch(ctx)
+ mux = createMultiplexer(ctx)
for (cpu in ctx.cpus) {
val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu))
@@ -113,7 +113,7 @@ public abstract class SimAbstractHypervisor(
governor.onStart()
}
- switch.addInput(cpu)
+ mux.addOutput(cpu)
}
}
@@ -122,7 +122,7 @@ public abstract class SimAbstractHypervisor(
*
* @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(interpreter, parent = null, model) {
+ private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model) {
/**
* The interference key of this virtual machine.
*/
@@ -131,7 +131,7 @@ public abstract class SimAbstractHypervisor(
/**
* The vCPUs of the machine.
*/
- override val cpus = model.cpus.map { VCpu(switch, switch.newOutput(interferenceKey), it) }
+ override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) }
override fun close() {
super.close()
@@ -153,10 +153,10 @@ public abstract class SimAbstractHypervisor(
* A [SimProcessingUnit] of a virtual machine.
*/
private class VCpu(
- private val switch: SimResourceSwitch,
- private val source: SimResourceProvider,
+ private val switch: FlowMultiplexer,
+ private val source: FlowConsumer,
override val model: ProcessingUnit
- ) : SimProcessingUnit, SimResourceProvider by source {
+ ) : SimProcessingUnit, FlowConsumer by source {
override var capacity: Double
get() = source.capacity
set(_) {
@@ -169,7 +169,7 @@ public abstract class SimAbstractHypervisor(
* Close the CPU
*/
fun close() {
- switch.removeOutput(source)
+ switch.removeInput(source)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
index 3b44292d..36ab7c1c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
@@ -28,39 +28,39 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSwitch
-import org.opendc.simulator.resources.SimResourceSwitchMaxMin
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
+import org.opendc.simulator.flow.mux.FlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine]
* concurrently using weighted fair sharing.
*
- * @param interpreter The interpreter to manage the machine's resources.
+ * @param engine The [FlowEngine] to manage the machine's resources.
* @param parent The parent simulation system.
* @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
* @param interferenceDomain The resource interference domain to which the hypervisor belongs.
* @param listener The hypervisor listener to use.
*/
public class SimFairShareHypervisor(
- private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
+ engine: FlowEngine,
+ private val parent: FlowSystem? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
private val listener: SimHypervisor.Listener? = null
-) : SimAbstractHypervisor(interpreter, scalingGovernor, interferenceDomain) {
+) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) {
- override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean = true
+ override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean = true
- override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
+ override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
return SwitchSystem(ctx).switch
}
- private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem {
- val switch = SimResourceSwitchMaxMin(interpreter, this, interferenceDomain)
+ private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowSystem {
+ val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
- override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent
+ override val parent: FlowSystem? = this@SimFairShareHypervisor.parent
private var lastCpuUsage = 0.0
private var lastCpuDemand = 0.0
@@ -87,8 +87,8 @@ public class SimFairShareHypervisor(
}
lastReport = timestamp
- lastCpuDemand = switch.inputs.sumOf { it.demand }
- lastCpuUsage = switch.inputs.sumOf { it.speed }
+ lastCpuDemand = switch.outputs.sumOf { it.demand }
+ lastCpuUsage = switch.outputs.sumOf { it.rate }
lastDemand = counters.demand
lastActual = counters.actual
lastOvercommit = counters.overcommit
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
index 8d0592ec..bfa099fb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
@@ -34,13 +34,13 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
override fun create(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem?,
+ engine: FlowEngine,
+ parent: FlowSystem?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
): SimHypervisor = SimFairShareHypervisor(
- interpreter,
+ engine,
parent,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 3b49d515..1b11ca6b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -25,7 +25,7 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceCounters
+import org.opendc.simulator.flow.FlowCounters
/**
* A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
@@ -40,7 +40,7 @@ public interface SimHypervisor : SimWorkload {
/**
* The resource counters associated with the hypervisor.
*/
- public val counters: SimResourceCounters
+ public val counters: FlowCounters
/**
* Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
index b307a34d..97f07097 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
/**
* A service provider interface for constructing a [SimHypervisor].
@@ -43,8 +43,8 @@ public interface SimHypervisorProvider {
* Create a [SimHypervisor] instance with the specified [listener].
*/
public fun create(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem? = null,
+ engine: FlowEngine,
+ parent: FlowSystem? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
listener: SimHypervisor.Listener? = null
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
index ac1c0250..883e0d82 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
@@ -24,19 +24,19 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSwitch
-import org.opendc.simulator.resources.SimResourceSwitchExclusive
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.mux.FlowMultiplexer
+import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
-public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) {
- override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean {
- return switch.inputs.size - switch.outputs.size >= model.cpus.size
+public class SimSpaceSharedHypervisor(engine: FlowEngine) : SimAbstractHypervisor(engine) {
+ override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean {
+ return switch.outputs.size - switch.inputs.size >= model.cpus.size
}
- override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
- return SimResourceSwitchExclusive()
+ override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
+ return ForwardingFlowMultiplexer(engine)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
index 3906cb9a..7869d72d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
@@ -34,10 +34,10 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
override fun create(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem?,
+ engine: FlowEngine,
+ parent: FlowSystem?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
- ): SimHypervisor = SimSpaceSharedHypervisor(interpreter)
+ ): SimHypervisor = SimSpaceSharedHypervisor(engine)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
index 1801fcd0..b737d61a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute.kernel.interference
-import org.opendc.simulator.resources.interference.InterferenceDomain
-import org.opendc.simulator.resources.interference.InterferenceKey
+import org.opendc.simulator.flow.interference.InterferenceDomain
+import org.opendc.simulator.flow.interference.InterferenceKey
/**
* The interference domain of a hypervisor.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
index c2e00c8e..b3d72507 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute.kernel.interference
-import org.opendc.simulator.resources.interference.InterferenceKey
+import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.*
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
index 6577fbfc..f71446f8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
@@ -46,7 +46,7 @@ public class PStatePowerDriver(states: Map<Double, PowerModel>) : PowerDriver {
for (cpu in cpus) {
targetFreq = max(cpu.capacity, targetFreq)
- totalSpeed += cpu.speed
+ totalSpeed += cpu.rate
}
val maxFreq = states.lastKey()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
index bf7aeff1..34e91c35 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
@@ -37,7 +37,7 @@ public class SimplePowerDriver(private val model: PowerModel) : PowerDriver {
for (cpu in cpus) {
targetFreq += cpu.capacity
- totalSpeed += cpu.speed
+ totalSpeed += cpu.rate
}
return model.computePower(totalSpeed / targetFreq)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index a01fa20c..99f4a1e1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
@@ -44,7 +44,7 @@ public class SimFlopsWorkload(
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
- cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)))
+ cpu.startConsumer(lifecycle.waitFor(FixedFlowSource(flops.toDouble() / ctx.cpus.size, utilization)))
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 4ee56689..2ef3bc43 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* A [SimWorkload] that models application execution as a single duration.
@@ -44,7 +44,7 @@ public class SimRuntimeWorkload(
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
val limit = cpu.capacity * utilization
- cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization)))
+ cpu.startConsumer(lifecycle.waitFor(FixedFlowSource((limit / 1000) * duration, utilization)))
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index dd582bb2..a877dac1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
import kotlin.math.min
/**
@@ -78,12 +78,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
return now >= timestamp + duration
}
- private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ private inner class Consumer(val cpu: ProcessingUnit) : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val fragment = pullFragment(now)
if (fragment == null) {
- ctx.close()
+ conn.close()
return Long.MAX_VALUE
}
@@ -91,7 +91,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
// Fragment is in the future
if (timestamp > now) {
- ctx.push(0.0)
+ conn.push(0.0)
return timestamp - now
}
@@ -103,7 +103,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
val deadline = timestamp + fragment.duration
val duration = deadline - now
- ctx.push(if (cpu.id < cores && usage > 0.0) usage else 0.0)
+ conn.push(if (cpu.id < cores && usage > 0.0) usage else 0.0)
return duration
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index 5dd18271..dabe60e0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -23,9 +23,9 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
/**
* A helper class to manage the lifecycle of a [SimWorkload]
@@ -34,27 +34,27 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
/**
* The resource consumers which represent the lifecycle of the workload.
*/
- private val waiting = mutableSetOf<SimResourceConsumer>()
+ private val waiting = mutableSetOf<FlowSource>()
/**
* Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
*/
- public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer {
+ public fun waitFor(consumer: FlowSource): FlowSource {
waiting.add(consumer)
- return object : SimResourceConsumer by consumer {
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ return object : FlowSource by consumer {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
try {
- consumer.onEvent(ctx, event)
+ consumer.onEvent(conn, now, event)
} finally {
- if (event == SimResourceEvent.Exit) {
+ if (event == FlowEvent.Exit) {
complete(consumer)
}
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
try {
- consumer.onFailure(ctx, cause)
+ consumer.onFailure(conn, cause)
} finally {
complete(consumer)
}
@@ -65,9 +65,9 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
}
/**
- * Complete the specified [SimResourceConsumer].
+ * Complete the specified [FlowSource].
*/
- private fun complete(consumer: SimResourceConsumer) {
+ private fun complete(consumer: FlowSource) {
if (waiting.remove(consumer) && waiting.isEmpty()) {
ctx.close()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 81268879..0bb24ed8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -34,10 +34,10 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.compute.workload.SimWorkloadLifecycle
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.source.FixedFlowSource
import org.opendc.simulator.network.SimNetworkSink
import org.opendc.simulator.power.SimPowerSource
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* Test suite for the [SimBareMetalMachine] class.
@@ -60,7 +60,7 @@ class SimMachineTest {
@Test
fun testFlopsWorkload() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -83,7 +83,7 @@ class SimMachineTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -100,13 +100,13 @@ class SimMachineTest {
@Test
fun testPower() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(LinearPowerModel(100.0, 50.0))
)
- val source = SimPowerSource(interpreter, capacity = 1000.0)
+ val source = SimPowerSource(engine, capacity = 1000.0)
source.connect(machine.psu)
try {
@@ -125,7 +125,7 @@ class SimMachineTest {
@Test
fun testCapacityClamp() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -151,7 +151,7 @@ class SimMachineTest {
@Test
fun testMemory() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -171,7 +171,7 @@ class SimMachineTest {
@Test
fun testMemoryUsage() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -180,7 +180,7 @@ class SimMachineTest {
machine.run(object : SimWorkload {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
- ctx.memory.startConsumer(lifecycle.waitFor(SimWorkConsumer(ctx.memory.capacity, utilization = 0.8)))
+ ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8)))
}
})
@@ -192,22 +192,22 @@ class SimMachineTest {
@Test
fun testNetUsage() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
val adapter = (machine.peripherals[0] as SimNetworkAdapter)
- adapter.connect(SimNetworkSink(interpreter, adapter.bandwidth))
+ adapter.connect(SimNetworkSink(engine, adapter.bandwidth))
try {
machine.run(object : SimWorkload {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
val iface = ctx.net[0]
- iface.tx.startConsumer(lifecycle.waitFor(SimWorkConsumer(iface.bandwidth, utilization = 0.8)))
+ iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8)))
}
})
@@ -219,9 +219,9 @@ class SimMachineTest {
@Test
fun testDiskReadUsage() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -231,7 +231,7 @@ class SimMachineTest {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
val disk = ctx.storage[0]
- disk.read.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.read.capacity, utilization = 0.8)))
+ disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8)))
}
})
@@ -243,9 +243,9 @@ class SimMachineTest {
@Test
fun testDiskWriteUsage() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -255,7 +255,7 @@ class SimMachineTest {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
val disk = ctx.storage[0]
- disk.write.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.write.capacity, utilization = 0.8)))
+ disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8)))
}
})
@@ -268,7 +268,7 @@ class SimMachineTest {
@Test
fun testCancellation() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -290,7 +290,7 @@ class SimMachineTest {
@Test
fun testConcurrentRuns() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -313,7 +313,7 @@ class SimMachineTest {
@Test
fun testClose() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt
index 6c9ec7bd..e5b509f0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt
@@ -29,8 +29,8 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.simulator.power.SimPowerSource
-import org.opendc.simulator.resources.SimResourceInterpreter
/**
* Test suite for [SimPsu]
@@ -55,8 +55,8 @@ internal class SimPsuTest {
val ratedOutputPower = 240.0
val energyEfficiency = mapOf(0.0 to 1.0)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = ratedOutputPower)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = ratedOutputPower)
val cpuLogic = mockk<PowerDriver.Logic>()
every { cpuLogic.computePower() } returns 0.0
@@ -78,8 +78,8 @@ internal class SimPsuTest {
1.0 to 0.94,
)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = ratedOutputPower)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = ratedOutputPower)
val cpuLogic = mockk<PowerDriver.Logic>()
every { cpuLogic.computePower() } returnsMany listOf(50.0, 100.0, 150.0, 200.0)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index 8cd535ad..058d5d28 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -40,7 +40,7 @@ import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* Test suite for the [SimHypervisor] class.
@@ -94,7 +94,7 @@ internal class SimHypervisorTest {
),
)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener)
@@ -163,7 +163,7 @@ internal class SimHypervisorTest {
)
)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -204,7 +204,7 @@ internal class SimHypervisorTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -234,7 +234,7 @@ internal class SimHypervisorTest {
)
val interferenceModel = VmInterferenceModel(groups)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 55d6d7c4..95fb6679 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -40,7 +40,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* A test suite for the [SimSpaceSharedHypervisor].
@@ -74,11 +74,11 @@ internal class SimSpaceSharedHypervisorTest {
),
)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
val vm = hypervisor.createMachine(machineModel)
@@ -98,11 +98,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testRuntimeWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimRuntimeWorkload(duration)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -121,11 +121,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testFlopsWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -142,11 +142,11 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testTwoWorkloads() = runBlockingSimulation {
val duration = 5 * 60L * 1000
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -170,11 +170,9 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -194,7 +192,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val interpreter = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
index c39859bf..f557c8d3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
@@ -55,7 +55,7 @@ internal class PStatePowerDriverTest {
val cpu = mockk<SimProcessingUnit>(relaxUnitFun = true)
every { cpu.capacity } returns 3200.0
- every { cpu.speed } returns 1200.0
+ every { cpu.rate } returns 1200.0
val driver = PStatePowerDriver(
sortedMapOf(
@@ -77,10 +77,10 @@ internal class PStatePowerDriverTest {
val cpus = listOf(cpu, cpu)
every { cpus[0].capacity } returns 1000.0
- every { cpus[0].speed } returns 1200.0
+ every { cpus[0].rate } returns 1200.0
every { cpus[1].capacity } returns 3500.0
- every { cpus[1].speed } returns 1200.0
+ every { cpus[1].rate } returns 1200.0
val driver = PStatePowerDriver(
sortedMapOf(
@@ -112,11 +112,11 @@ internal class PStatePowerDriverTest {
val logic = driver.createLogic(machine, listOf(cpu))
- every { cpu.speed } returns 1400.0
+ every { cpu.rate } returns 1400.0
every { cpu.capacity } returns 1400.0
assertEquals(150.0, logic.computePower())
- every { cpu.speed } returns 1400.0
+ every { cpu.rate } returns 1400.0
every { cpu.capacity } returns 4000.0
assertEquals(235.0, logic.computePower())
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index 78019c2e..cdbffe4b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -31,7 +31,7 @@ import org.opendc.simulator.compute.model.*
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* Test suite for the [SimTraceWorkloadTest] class.
@@ -52,7 +52,7 @@ class SimTraceWorkloadTest {
@Test
fun testSmoke() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -79,7 +79,7 @@ class SimTraceWorkloadTest {
@Test
fun testOffset() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -106,7 +106,7 @@ class SimTraceWorkloadTest {
@Test
fun testSkipFragment() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -134,7 +134,7 @@ class SimTraceWorkloadTest {
@Test
fun testZeroCores() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
index 68047d5c..5a956fee 100644
--- a/opendc-simulator/opendc-simulator-resources/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-description = "Uniform resource consumption simulation model"
+description = "High-performance flow simulator"
plugins {
`kotlin-library-conventions`
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index fbc3f319..4834f10f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -20,13 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow.source.TraceFlowSource
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
@@ -36,101 +38,101 @@ import java.util.concurrent.TimeUnit
@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@OptIn(ExperimentalCoroutinesApi::class)
-class SimResourceBenchmarks {
+class FlowBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var interpreter: SimResourceInterpreter
+ private lateinit var engine: FlowEngine
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
+ engine = FlowEngine(scope.coroutineContext, scope.clock)
}
@State(Scope.Thread)
class Workload {
- lateinit var trace: Sequence<SimTraceConsumer.Fragment>
+ lateinit var trace: Sequence<TraceFlowSource.Fragment>
@Setup
fun setUp() {
val random = ThreadLocalRandom.current()
- val entries = List(10000) { SimTraceConsumer.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
+ val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
trace = entries.asSequence()
}
}
@Benchmark
- fun benchmarkSource(state: Workload) {
+ fun benchmarkSink(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, interpreter)
- return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
+ val provider = FlowSink(engine, 4200.0)
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkForwardOverhead(state: Workload) {
+ fun benchmarkForward(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, interpreter)
- val forwarder = SimResourceForwarder()
+ val provider = FlowSink(engine, 4200.0)
+ val forwarder = FlowForwarder(engine)
provider.startConsumer(forwarder)
- return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace))
+ return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
+ fun benchmarkMuxMaxMinSingleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(interpreter)
+ val switch = MaxMinFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
- val provider = switch.newOutput()
- return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
+ fun benchmarkMuxMaxMinTripleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(interpreter)
+ val switch = MaxMinFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
repeat(3) {
launch {
- val provider = switch.newOutput()
- provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(state.trace))
}
}
}
}
@Benchmark
- fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) {
+ fun benchmarkMuxExclusiveSingleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchExclusive()
+ val switch = ForwardingFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
- val provider = switch.newOutput()
- return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) {
+ fun benchmarkMuxExclusiveTripleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchExclusive()
+ val switch = ForwardingFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
repeat(2) {
launch {
- val provider = switch.newOutput()
- provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(state.trace))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
index 085cba63..c8092082 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -20,25 +20,22 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
-import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+import org.opendc.simulator.flow.internal.FlowCountersImpl
/**
- * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations.
+ * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations.
*/
-public abstract class SimAbstractResourceProvider(
- private val interpreter: SimResourceInterpreter,
- initialCapacity: Double
-) : SimResourceProvider {
+public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer {
/**
- * A flag to indicate that the resource provider is active.
+ * A flag to indicate that the flow consumer is active.
*/
public override val isActive: Boolean
get() = ctx != null
/**
- * The capacity of the resource.
+ * The capacity of the consumer.
*/
public override var capacity: Double = initialCapacity
set(value) {
@@ -47,51 +44,51 @@ public abstract class SimAbstractResourceProvider(
}
/**
- * The current processing speed of the resource.
+ * The current processing rate of the consumer.
*/
- public override val speed: Double
- get() = ctx?.speed ?: 0.0
+ public override val rate: Double
+ get() = ctx?.rate ?: 0.0
/**
- * The resource processing speed demand at this instant.
+ * The flow processing rate demand at this instant.
*/
public override val demand: Double
get() = ctx?.demand ?: 0.0
/**
- * The resource counters to track the execution metrics of the resource.
+ * The flow counters to track the flow metrics of the consumer.
*/
- public override val counters: SimResourceCounters
+ public override val counters: FlowCounters
get() = _counters
- private val _counters = SimResourceCountersImpl()
+ private val _counters = FlowCountersImpl()
/**
- * The [SimResourceControllableContext] that is currently running.
+ * The [FlowConsumerContext] that is currently running.
*/
- protected var ctx: SimResourceControllableContext? = null
+ protected var ctx: FlowConsumerContext? = null
private set
/**
- * Construct the [SimResourceProviderLogic] instance for a new consumer.
+ * Construct the [FlowConsumerLogic] instance for a new source.
*/
- protected abstract fun createLogic(): SimResourceProviderLogic
+ protected abstract fun createLogic(): FlowConsumerLogic
/**
- * Start the specified [SimResourceControllableContext].
+ * Start the specified [FlowConsumerContext].
*/
- protected open fun start(ctx: SimResourceControllableContext) {
+ protected open fun start(ctx: FlowConsumerContext) {
ctx.start()
}
/**
- * The previous demand for the resource.
+ * The previous demand for the consumer.
*/
private var previousDemand = 0.0
/**
- * Update the counters of the resource provider.
+ * Update the counters of the flow consumer.
*/
- protected fun updateCounters(ctx: SimResourceContext, delta: Long) {
+ protected fun updateCounters(ctx: FlowConnection, delta: Long) {
val demand = previousDemand
previousDemand = ctx.demand
@@ -102,7 +99,7 @@ public abstract class SimAbstractResourceProvider(
val counters = _counters
val deltaS = delta / 1000.0
val work = demand * deltaS
- val actualWork = ctx.speed * deltaS
+ val actualWork = ctx.rate * deltaS
val remainingWork = work - actualWork
counters.demand += work
@@ -111,7 +108,7 @@ public abstract class SimAbstractResourceProvider(
}
/**
- * Update the counters of the resource provider.
+ * Update the counters of the flow consumer.
*/
protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
val counters = _counters
@@ -120,9 +117,9 @@ public abstract class SimAbstractResourceProvider(
counters.overcommit += overcommit
}
- final override fun startConsumer(consumer: SimResourceConsumer) {
- check(ctx == null) { "Resource is in invalid state" }
- val ctx = interpreter.newContext(consumer, createLogic())
+ final override fun startConsumer(source: FlowSource) {
+ check(ctx == null) { "Consumer is in invalid state" }
+ val ctx = engine.newContext(source, createLogic())
ctx.capacity = capacity
this.ctx = ctx
@@ -130,8 +127,8 @@ public abstract class SimAbstractResourceProvider(
start(ctx)
}
- final override fun interrupt() {
- ctx?.interrupt()
+ final override fun pull() {
+ ctx?.pull()
}
final override fun cancel() {
@@ -142,5 +139,5 @@ public abstract class SimAbstractResourceProvider(
}
}
- override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]"
+ override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
index 225cae0b..fa833961 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -20,49 +20,41 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
-
-import java.time.Clock
+package org.opendc.simulator.flow
/**
- * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
- * resource and a resource consumer.
+ * An active connection between a [FlowSource] and [FlowConsumer].
*/
-public interface SimResourceContext : AutoCloseable {
- /**
- * The virtual clock tracking simulation time.
- */
- public val clock: Clock
-
+public interface FlowConnection : AutoCloseable {
/**
- * The resource capacity available at this instant.
+ * The capacity of the connection.
*/
public val capacity: Double
/**
- * The resource processing speed at this instant.
+ * The flow rate over the connection.
*/
- public val speed: Double
+ public val rate: Double
/**
- * The resource processing speed demand at this instant.
+ * The flow demand of the source.
*/
public val demand: Double
/**
- * Ask the resource provider to interrupt its resource.
+ * Pull the source.
*/
- public fun interrupt()
+ public fun pull()
/**
- * Push the given flow to this context.
+ * Push the given flow [rate] over this connection.
*
* @param rate The rate of the flow to push.
*/
public fun push(rate: Double)
/**
- * Stop the resource context.
+ * Disconnect the consumer from its source.
*/
public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
index b68b7261..3a6e2e97 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -20,77 +20,80 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
- * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer].
+ * A consumer of a [FlowSource].
*/
-public interface SimResourceProvider {
+public interface FlowConsumer {
/**
- * A flag to indicate that the resource provider is currently being consumed by a [SimResourceConsumer].
+ * A flag to indicate that the consumer is currently consuming a [FlowSource].
*/
public val isActive: Boolean
/**
- * The resource capacity available at this instant.
+ * The flow capacity of this consumer.
*/
public val capacity: Double
/**
- * The current processing speed of the resource.
+ * The current flow rate of the consumer.
*/
- public val speed: Double
+ public val rate: Double
/**
- * The resource processing speed demand at this instant.
+ * The current flow demand.
*/
public val demand: Double
/**
- * The resource counters to track the execution metrics of the resource.
+ * The flow counters to track the flow metrics of the consumer.
*/
- public val counters: SimResourceCounters
+ public val counters: FlowCounters
/**
- * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
+ * Start consuming the specified [source].
*
- * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
+ * @throws IllegalStateException if the consumer is already active.
*/
- public fun startConsumer(consumer: SimResourceConsumer)
+ public fun startConsumer(source: FlowSource)
/**
- * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
+ * Ask the consumer to pull its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
*/
- public fun interrupt()
+ public fun pull()
/**
- * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
+ * Disconnect the consumer from its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
*/
public fun cancel()
}
/**
- * Consume the resource provided by this provider using the specified [consumer] and suspend execution until
- * the consumer has finished.
+ * Consume the specified [source] and suspend execution until the source is fully consumed or failed.
*/
-public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
+public suspend fun FlowConsumer.consume(source: FlowSource) {
return suspendCancellableCoroutine { cont ->
- startConsumer(object : SimResourceConsumer by consumer {
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- consumer.onEvent(ctx, event)
+ startConsumer(object : FlowSource by source {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ source.onEvent(conn, now, event)
- if (event == SimResourceEvent.Exit && !cont.isCompleted) {
+ if (event == FlowEvent.Exit && !cont.isCompleted) {
cont.resume(Unit)
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
try {
- consumer.onFailure(ctx, cause)
+ source.onFailure(conn, cause)
cont.resumeWithException(cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
@@ -98,7 +101,7 @@ public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
}
}
- override fun toString(): String = "SimSuspendingResourceConsumer"
+ override fun toString(): String = "SuspendingFlowSource"
})
cont.invokeOnCancellation { cancel() }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
new file mode 100644
index 00000000..75b2d25b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A controllable [FlowConnection].
+ *
+ * This interface is used by [FlowConsumer]s to control the connection between it and the source.
+ */
+public interface FlowConsumerContext : FlowConnection {
+ /**
+ * The capacity of the connection.
+ */
+ public override var capacity: Double
+
+ /**
+ * Start the flow over the connection.
+ */
+ public fun start()
+
+ /**
+ * Synchronously flush the changes of the connection.
+ */
+ public fun flush()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
index cc718165..c69cb17e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -20,23 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A collection of callbacks associated with a flow stage.
+ * A collection of callbacks associated with a [FlowConsumer].
*/
-public interface SimResourceProviderLogic {
+public interface FlowConsumerLogic {
/**
- * This method is invoked when the consumer ask to consume the resource for the specified [duration].
+ * This method is invoked when a [FlowSource] changes the rate of flow to this consumer.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds.
- * @param limit The limit on the work rate of the resource consumer.
- * @param duration The duration of the consumption in milliseconds.
- * @return The deadline of the resource consumption.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
+ * @param rate The requested processing rate of the source.
*/
- public fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {}
+ public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {}
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -45,14 +43,14 @@ public interface SimResourceProviderLogic {
* @param now The virtual timestamp in milliseconds at which the system converged.
* @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {}
+ public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
/**
- * This method is invoked when the resource consumer has finished.
+ * This method is invoked when the [FlowSource] is completed.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
*/
- public fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {}
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
index 11924db2..e15d7643 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -20,34 +20,34 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * An interface that tracks cumulative counts of the work performed by a resource.
+ * An interface that tracks cumulative counts of the flow accumulation over a stage.
*/
-public interface SimResourceCounters {
+public interface FlowCounters {
/**
- * The amount of work that resource consumers wanted the resource to perform.
+ * The accumulated flow that a source wanted to push over the connection.
*/
public val demand: Double
/**
- * The amount of work performed by the resource.
+ * The accumulated flow that was actually transferred over the connection.
*/
public val actual: Double
/**
- * The amount of work that could not be completed due to overcommitted resources.
+ * The accumulated flow that could not be transferred over the connection.
*/
public val overcommit: Double
/**
- * The amount of work lost due to interference.
+ * The accumulated flow lost due to interference between sources.
*/
public val interference: Double
/**
- * Reset the resource counters.
+ * Reset the flow counters.
*/
public fun reset()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
index 4bfeaf20..65224827 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
@@ -20,31 +20,31 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
import java.time.Clock
import kotlin.coroutines.CoroutineContext
/**
- * The resource interpreter is responsible for managing the interaction between resource consumer and provider.
+ * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s.
*
- * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation
+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
* to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
*/
-public interface SimResourceInterpreter {
+public interface FlowEngine {
/**
- * The [Clock] associated with this interpreter.
+ * The virtual [Clock] associated with this engine.
*/
public val clock: Clock
/**
- * Create a new [SimResourceControllableContext] with the given [provider].
+ * Create a new [FlowConsumerContext] with the given [provider].
*
* @param consumer The consumer logic.
* @param provider The logic of the resource provider.
*/
- public fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext
+ public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext
/**
* Start batching the execution of resource updates until [popBatch] is called.
@@ -67,14 +67,15 @@ public interface SimResourceInterpreter {
public companion object {
/**
- * Construct a new [SimResourceInterpreter] implementation.
+ * Construct a new [FlowEngine] implementation.
*
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
+ @JvmStatic
@JvmName("create")
- public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter {
- return SimResourceInterpreterImpl(context, clock)
+ public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine {
+ return FlowEngineImpl(context, clock)
}
}
}
@@ -84,7 +85,7 @@ public interface SimResourceInterpreter {
*
* This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
*/
-public inline fun SimResourceInterpreter.batch(block: () -> Unit) {
+public inline fun FlowEngine.batch(block: () -> Unit) {
try {
pushBatch()
block()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
index 959427f1..14c85183 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
@@ -20,29 +20,29 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A resource event that is communicated to the resource consumer.
+ * A flow event that is communicated to a [FlowSource].
*/
-public enum class SimResourceEvent {
+public enum class FlowEvent {
/**
- * This event is emitted to the consumer when it has started.
+ * This event is emitted to the source when it has started.
*/
Start,
/**
- * This event is emitted to the consumer when it has exited.
+ * This event is emitted to the source when it is stopped.
*/
Exit,
/**
- * This event is emitted to the consumer when it has started a new resource consumption or idle cycle.
+ * This event is emitted to the source when the system has converged into a steady state.
*/
- Run,
+ Converge,
/**
- * This event is emitted to the consumer when the capacity of the resource has changed.
+ * This event is emitted to the source when the capacity of the consumer has changed.
*/
Capacity,
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 0cd2bfc7..2074033e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -20,21 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
-import org.opendc.simulator.resources.impl.SimResourceCountersImpl
-import java.time.Clock
+import org.opendc.simulator.flow.internal.FlowCountersImpl
/**
- * A class that acts as a [SimResourceConsumer] and [SimResourceProvider] at the same time.
+ * A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
*
+ * @param engine The [FlowEngine] the forwarder runs in.
* @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
*/
-public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimResourceConsumer, SimResourceProvider, AutoCloseable {
+public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable {
/**
- * The delegate [SimResourceConsumer].
+ * The delegate [FlowSource].
*/
- private var delegate: SimResourceConsumer? = null
+ private var delegate: FlowSource? = null
/**
* A flag to indicate that the delegate was started.
@@ -42,28 +42,25 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
private var hasDelegateStarted: Boolean = false
/**
- * The exposed [SimResourceContext].
+ * The exposed [FlowConnection].
*/
- private val _ctx = object : SimResourceContext {
- override val clock: Clock
- get() = _innerCtx!!.clock
-
+ private val _ctx = object : FlowConnection {
override val capacity: Double
get() = _innerCtx?.capacity ?: 0.0
override val demand: Double
get() = _innerCtx?.demand ?: 0.0
- override val speed: Double
- get() = _innerCtx?.speed ?: 0.0
+ override val rate: Double
+ get() = _innerCtx?.rate ?: 0.0
- override fun interrupt() {
- _innerCtx?.interrupt()
+ override fun pull() {
+ _innerCtx?.pull()
}
override fun push(rate: Double) {
_innerCtx?.push(rate)
- _limit = rate
+ _demand = rate
}
override fun close() {
@@ -78,14 +75,14 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
// reset beforehand the existing state and check whether it has been updated afterwards
reset()
- delegate.onEvent(this, SimResourceEvent.Exit)
+ delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit)
}
}
/**
- * The [SimResourceContext] in which the forwarder runs.
+ * The [FlowConnection] in which the forwarder runs.
*/
- private var _innerCtx: SimResourceContext? = null
+ private var _innerCtx: FlowConnection? = null
override val isActive: Boolean
get() = delegate != null
@@ -93,27 +90,27 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
override val capacity: Double
get() = _ctx.capacity
- override val speed: Double
- get() = _ctx.speed
+ override val rate: Double
+ get() = _ctx.rate
override val demand: Double
get() = _ctx.demand
- override val counters: SimResourceCounters
+ override val counters: FlowCounters
get() = _counters
- private val _counters = SimResourceCountersImpl()
+ private val _counters = FlowCountersImpl()
- override fun startConsumer(consumer: SimResourceConsumer) {
- check(delegate == null) { "Resource transformer already active" }
+ override fun startConsumer(source: FlowSource) {
+ check(delegate == null) { "Forwarder already active" }
- delegate = consumer
+ delegate = source
- // Interrupt the provider to replace the consumer
- interrupt()
+ // Pull to replace the source
+ pull()
}
- override fun interrupt() {
- _ctx.interrupt()
+ override fun pull() {
+ _ctx.pull()
}
override fun cancel() {
@@ -124,7 +121,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
this.delegate = null
if (ctx != null) {
- delegate.onEvent(this._ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit)
}
}
}
@@ -134,41 +131,41 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
if (ctx != null) {
this._innerCtx = null
- ctx.interrupt()
+ ctx.pull()
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val delegate = delegate
if (!hasDelegateStarted) {
start()
}
- updateCounters(ctx, delta)
+ updateCounters(conn, delta)
- return delegate?.onNext(this._ctx, now, delta) ?: Long.MAX_VALUE
+ return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> {
- _innerCtx = ctx
+ FlowEvent.Start -> {
+ _innerCtx = conn
}
- SimResourceEvent.Exit -> {
+ FlowEvent.Exit -> {
_innerCtx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onEvent(this._ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this._ctx, now, FlowEvent.Exit)
}
}
- else -> delegate?.onEvent(this._ctx, event)
+ else -> delegate?.onEvent(this._ctx, now, event)
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
_innerCtx = null
val delegate = delegate
@@ -183,7 +180,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
*/
private fun start() {
val delegate = delegate ?: return
- delegate.onEvent(checkNotNull(_innerCtx), SimResourceEvent.Start)
+ delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start)
hasDelegateStarted = true
}
@@ -197,22 +194,22 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
}
/**
- * The requested speed.
+ * The requested flow rate.
*/
- private var _limit: Double = 0.0
+ private var _demand: Double = 0.0
/**
- * Update the resource counters for the transformer.
+ * Update the flow counters for the transformer.
*/
- private fun updateCounters(ctx: SimResourceContext, delta: Long) {
+ private fun updateCounters(ctx: FlowConnection, delta: Long) {
if (delta <= 0) {
return
}
val counters = _counters
val deltaS = delta / 1000.0
- val work = _limit * deltaS
- val actualWork = ctx.speed * deltaS
+ val work = _demand * deltaS
+ val actualWork = ctx.rate * deltaS
counters.demand += work
counters.actual += actualWork
counters.overcommit += (work - actualWork)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index c8d4cf0d..fb6ca85d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -20,42 +20,42 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity.
+ * A [FlowSink] represents a sink with a fixed capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param interpreter The interpreter that is used for managing the resource contexts.
- * @param parent The parent resource system.
+ * @param engine The engine that is used for driving the flow simulation.
+ * @param parent The parent flow system.
*/
-public class SimResourceSource(
+public class FlowSink(
+ private val engine: FlowEngine,
initialCapacity: Double,
- private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null
-) : SimAbstractResourceProvider(interpreter, initialCapacity) {
- override fun createLogic(): SimResourceProviderLogic {
- return object : SimResourceProviderLogic {
- override fun onConsume(
- ctx: SimResourceControllableContext,
+ private val parent: FlowSystem? = null
+) : AbstractFlowConsumer(engine, initialCapacity) {
+
+ override fun createLogic(): FlowConsumerLogic {
+ return object : FlowConsumerLogic {
+ override fun onPush(
+ ctx: FlowConsumerContext,
now: Long,
delta: Long,
- limit: Double,
- duration: Long
+ rate: Double
) {
updateCounters(ctx, delta)
}
- override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
updateCounters(ctx, delta)
cancel()
}
- override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
parent?.onConverge(now)
}
}
}
- override fun toString(): String = "SimResourceSource[capacity=$capacity]"
+ override fun toString(): String = "FlowSink[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index 0b25358a..077b4d38 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -20,38 +20,39 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A [SimResourceConsumer] characterizes how a resource is consumed.
+ * A source of flow that is consumed by a [FlowConsumer].
*
- * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
- * for multiple resource providers, unless explicitly said otherwise.
+ * Implementations of this interface should be considered stateful and must be assumed not to be re-usable
+ * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise.
*/
-public interface SimResourceConsumer {
+public interface FlowSource {
/**
- * This method is invoked when the resource provider is pulling this resource consumer.
+ * This method is invoked when the source is pulled.
*
- * @param ctx The execution context in which the consumer runs.
- * @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param delta The virtual duration between this call and the last call in milliseconds.
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the pull is occurring.
+ * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
* @return The duration after which the resource consumer should be pulled again.
*/
- public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long
+ public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
/**
* This method is invoked when an event has occurred.
*
- * @param ctx The execution context in which the consumer runs.
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the event is occurring.
* @param event The event that has occurred.
*/
- public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {}
+ public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {}
/**
- * This method is invoked when a resource consumer throws an exception.
+ * This method is invoked when the source throws an exception.
*
- * @param ctx The execution context in which the consumer runs.
+ * @param conn The connection between the source and consumer.
* @param cause The cause of the failure.
*/
- public fun onFailure(ctx: SimResourceContext, cause: Throwable) {}
+ public fun onFailure(conn: FlowConnection, cause: Throwable) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt
index 609262cb..db6aa69f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
* A system of possible multiple sub-resources.
@@ -28,11 +28,11 @@ package org.opendc.simulator.resources
* This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the
* resource provider.
*/
-public interface SimResourceSystem {
+public interface FlowSystem {
/**
* The parent system to which this system belongs or `null` if it has no parent.
*/
- public val parent: SimResourceSystem?
+ public val parent: FlowSystem?
/**
* This method is invoked when the system has converged to a steady-state.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
index 1066777f..aa2713b6 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
@@ -1,10 +1,10 @@
-package org.opendc.simulator.resources.interference
+package org.opendc.simulator.flow.interference
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.flow.FlowSource
/**
- * An interference domain represents a system of resources where [resource consumers][SimResourceConsumer] may incur
- * performance variability due to operating on the same resources and therefore causing interference.
+ * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur
+ * performance variability due to operating on the same resource and therefore causing interference.
*/
public interface InterferenceDomain {
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
index 8b12e7b4..d28ebde5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.interference
+package org.opendc.simulator.flow.interference
/**
* A key that uniquely identifies a participant of an interference domain.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index cbfa7afd..9f3afc4d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -20,28 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.impl
+package org.opendc.simulator.flow.internal
-import org.opendc.simulator.resources.*
-import java.time.Clock
+import org.opendc.simulator.flow.*
import java.util.ArrayDeque
import kotlin.math.max
import kotlin.math.min
/**
- * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ * Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
*/
-internal class SimResourceContextImpl(
- private val interpreter: SimResourceInterpreterImpl,
- private val consumer: SimResourceConsumer,
- private val logic: SimResourceProviderLogic
-) : SimResourceControllableContext {
+internal class FlowConsumerContextImpl(
+ private val engine: FlowEngineImpl,
+ private val source: FlowSource,
+ private val logic: FlowConsumerLogic
+) : FlowConsumerContext {
/**
- * The clock of the context.
+ * The clock to track simulation time.
*/
- override val clock: Clock
- get() = _clock
- private val _clock = interpreter.clock
+ private val _clock = engine.clock
/**
* The capacity of the resource.
@@ -65,7 +62,7 @@ internal class SimResourceContextImpl(
/**
* The current processing speed of the resource.
*/
- override val speed: Double
+ override val rate: Double
get() = _rate
private var _rate = 0.0
@@ -101,14 +98,14 @@ internal class SimResourceContextImpl(
/**
* The timers at which the context is scheduled to be interrupted.
*/
- private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque()
+ private val _timers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque()
override fun start() {
check(_state == State.Pending) { "Consumer is already started" }
- interpreter.batch {
- consumer.onEvent(this, SimResourceEvent.Start)
+ engine.batch {
+ source.onEvent(this, _clock.millis(), FlowEvent.Start)
_state = State.Active
- interrupt()
+ pull()
}
}
@@ -117,36 +114,27 @@ internal class SimResourceContextImpl(
return
}
- interpreter.batch {
+ engine.batch {
_state = State.Stopped
if (!_updateActive) {
- val now = clock.millis()
+ val now = _clock.millis()
val delta = max(0, now - _lastUpdate)
doStop(now, delta)
// FIX: Make sure the context converges
_flag = _flag or FLAG_INVALIDATE
- scheduleUpdate(clock.millis())
+ scheduleUpdate(_clock.millis())
}
}
}
- override fun interrupt() {
+ override fun pull() {
if (_state == State.Stopped) {
return
}
_flag = _flag or FLAG_INTERRUPT
- scheduleUpdate(clock.millis())
- }
-
- override fun invalidate() {
- if (_state == State.Stopped) {
- return
- }
-
- _flag = _flag or FLAG_INVALIDATE
- scheduleUpdate(clock.millis())
+ scheduleUpdate(_clock.millis())
}
override fun flush() {
@@ -154,7 +142,7 @@ internal class SimResourceContextImpl(
return
}
- interpreter.scheduleSync(clock.millis(), this)
+ engine.scheduleSync(_clock.millis(), this)
}
override fun push(rate: Double) {
@@ -167,7 +155,8 @@ internal class SimResourceContextImpl(
// Invalidate only if the active limit is change and no update is active
// If an update is active, it will already get picked up at the end of the update
if (_activeLimit != rate && !_updateActive) {
- invalidate()
+ _flag = _flag or FLAG_INVALIDATE
+ scheduleUpdate(_clock.millis())
}
}
@@ -196,7 +185,7 @@ internal class SimResourceContextImpl(
val delta = max(0, now - lastUpdate)
try {
- val duration = consumer.onNext(this, now, delta)
+ val duration = source.onPull(this, now, delta)
val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration
// Reset update flags
@@ -205,7 +194,7 @@ internal class SimResourceContextImpl(
// Check whether the state has changed after [consumer.onNext]
when (_state) {
State.Active -> {
- logic.onConsume(this, now, delta, _limit, duration)
+ logic.onPush(this, now, delta, _limit)
// Schedule an update at the new deadline
scheduleUpdate(now, newDeadline)
@@ -261,7 +250,7 @@ internal class SimResourceContextImpl(
try {
if (_state == State.Active) {
- consumer.onEvent(this, SimResourceEvent.Run)
+ source.onEvent(this, timestamp, FlowEvent.Converge)
}
logic.onConverge(this, timestamp, delta)
@@ -270,14 +259,14 @@ internal class SimResourceContextImpl(
}
}
- override fun toString(): String = "SimResourceContextImpl[capacity=$capacity,rate=$_rate]"
+ override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
/**
* Stop the resource context.
*/
private fun doStop(now: Long, delta: Long) {
try {
- consumer.onEvent(this, SimResourceEvent.Exit)
+ source.onEvent(this, now, FlowEvent.Exit)
logic.onFinish(this, now, delta)
} catch (cause: Throwable) {
doFail(now, delta, cause)
@@ -292,7 +281,7 @@ internal class SimResourceContextImpl(
*/
private fun doFail(now: Long, delta: Long, cause: Throwable) {
try {
- consumer.onFailure(this, cause)
+ source.onFailure(this, cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
e.printStackTrace()
@@ -310,11 +299,11 @@ internal class SimResourceContextImpl(
return
}
- interpreter.batch {
+ engine.batch {
// Inform the consumer of the capacity change. This might already trigger an interrupt.
- consumer.onEvent(this, SimResourceEvent.Capacity)
+ source.onEvent(this, _clock.millis(), FlowEvent.Capacity)
- interrupt()
+ pull()
}
}
@@ -322,7 +311,7 @@ internal class SimResourceContextImpl(
* Schedule an update for this resource context.
*/
private fun scheduleUpdate(now: Long) {
- interpreter.scheduleImmediate(now, this)
+ engine.scheduleImmediate(now, this)
}
/**
@@ -331,7 +320,7 @@ internal class SimResourceContextImpl(
private fun scheduleUpdate(now: Long, target: Long) {
val timers = _timers
if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) {
- timers.addFirst(interpreter.scheduleDelayed(now, this, target))
+ timers.addFirst(engine.scheduleDelayed(now, this, target))
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
index 01062179..141d335d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
@@ -20,14 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.impl
+package org.opendc.simulator.flow.internal
-import org.opendc.simulator.resources.SimResourceCounters
+import org.opendc.simulator.flow.FlowCounters
/**
- * Mutable implementation of the [SimResourceCounters] interface.
+ * Mutable implementation of the [FlowCounters] interface.
*/
-internal class SimResourceCountersImpl : SimResourceCounters {
+internal class FlowCountersImpl : FlowCounters {
override var demand: Double = 0.0
override var actual: Double = 0.0
override var overcommit: Double = 0.0
@@ -41,6 +41,6 @@ internal class SimResourceCountersImpl : SimResourceCounters {
}
override fun toString(): String {
- return "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 2abf0749..1a50da2c 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -20,25 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.impl
+package org.opendc.simulator.flow.internal
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
import java.time.Clock
import java.util.*
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
/**
- * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after.
+ * Internal implementation of the [FlowEngine] interface.
*
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
-internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter {
+internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine {
/**
* The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
@@ -46,24 +46,24 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
/**
- * The queue of resource updates that are scheduled for immediate execution.
+ * The queue of connection updates that are scheduled for immediate execution.
*/
- private val queue = ArrayDeque<SimResourceContextImpl>()
+ private val queue = ArrayDeque<FlowConsumerContextImpl>()
/**
- * A priority queue containing the resource updates to be scheduled in the future.
+ * A priority queue containing the connection updates to be scheduled in the future.
*/
private val futureQueue = PriorityQueue<Timer>()
/**
- * The stack of interpreter invocations to occur in the future.
+ * The stack of engine invocations to occur in the future.
*/
private val futureInvocations = ArrayDeque<Invocation>()
/**
- * The systems that have been visited during the interpreter cycle.
+ * The systems that have been visited during the engine cycle.
*/
- private val visited = linkedSetOf<SimResourceContextImpl>()
+ private val visited = linkedSetOf<FlowConsumerContextImpl>()
/**
* The index in the batch stack.
@@ -71,7 +71,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
private var batchIndex = 0
/**
- * A flag to indicate that the interpreter is currently active.
+ * A flag to indicate that the engine is currently active.
*/
private val isRunning: Boolean
get() = batchIndex > 0
@@ -79,57 +79,57 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* Update the specified [ctx] synchronously.
*/
- fun scheduleSync(now: Long, ctx: SimResourceContextImpl) {
+ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
ctx.doUpdate(now)
visited.add(ctx)
- // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
- // up by the active interpreter.
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
if (isRunning) {
return
}
try {
batchIndex++
- runInterpreter(now)
+ runEngine(now)
} finally {
batchIndex--
}
}
/**
- * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle.
+ * Enqueue the specified [ctx] to be updated immediately during the active engine cycle.
*
- * This method should be used when the state of a resource context is invalidated/interrupted and needs to be
- * re-computed. In case no interpreter is currently active, the interpreter will be started.
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed. In case no engine is currently active, the engine will be started.
*/
- fun scheduleImmediate(now: Long, ctx: SimResourceContextImpl) {
+ fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) {
queue.add(ctx)
- // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
- // up by the active interpreter.
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
if (isRunning) {
return
}
try {
batchIndex++
- runInterpreter(now)
+ runEngine(now)
} finally {
batchIndex--
}
}
/**
- * Schedule the interpreter to run at [target] to update the resource contexts.
+ * Schedule the engine to run at [target] to update the flow contexts.
*
* This method will override earlier calls to this method for the same [ctx].
*
* @param now The current virtual timestamp.
- * @param ctx The resource context to which the event applies.
+ * @param ctx The flow context to which the event applies.
* @param target The timestamp when the interrupt should happen.
*/
- fun scheduleDelayed(now: Long, ctx: SimResourceContextImpl, target: Long): Timer {
+ fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer {
val futureQueue = futureQueue
require(target >= now) { "Timestamp must be in the future" }
@@ -140,7 +140,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
return timer
}
- override fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext = SimResourceContextImpl(this, consumer, provider)
+ override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
override fun pushBatch() {
batchIndex++
@@ -150,7 +150,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
try {
// Flush the work if the platform is not already running
if (batchIndex == 1 && queue.isNotEmpty()) {
- runInterpreter(clock.millis())
+ runEngine(clock.millis())
}
} finally {
batchIndex--
@@ -158,9 +158,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * Interpret all actions that are scheduled for the current timestamp.
+ * Run all the enqueued actions for the specified [timestamp][now].
*/
- private fun runInterpreter(now: Long) {
+ private fun runEngine(now: Long) {
val queue = queue
val futureQueue = futureQueue
val futureInvocations = futureInvocations
@@ -219,7 +219,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
visited.clear()
} while (queue.isNotEmpty())
- // Schedule an interpreter invocation for the next update to occur.
+ // Schedule an engine invocation for the next update to occur.
val headTimer = futureQueue.peek()
if (headTimer != null) {
trySchedule(now, futureInvocations, headTimer.target)
@@ -227,10 +227,10 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * Try to schedule an interpreter invocation at the specified [target].
+ * Try to schedule an engine invocation at the specified [target].
*
* @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the interpreter invocation should happen.
+ * @param target The virtual timestamp at which the engine invocation should happen.
* @param scheduled The queue of scheduled invocations.
*/
private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
@@ -245,7 +245,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
{
try {
batchIndex++
- runInterpreter(target)
+ runEngine(target)
} finally {
batchIndex--
}
@@ -267,9 +267,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * A future interpreter invocation.
+ * A future engine invocation.
*
- * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case
+ * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
* the invocation is not needed anymore, it can be cancelled via [cancel].
*/
private data class Invocation(
@@ -277,7 +277,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
@JvmField val handle: DisposableHandle
) {
/**
- * Cancel the interpreter invocation.
+ * Cancel the engine invocation.
*/
fun cancel() = handle.dispose()
}
@@ -285,10 +285,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* An update call for [ctx] that is scheduled for [target].
*
- * This class represents an update in the future at [target] requested by [ctx]. A deferred update might be
- * cancelled if the resource context was invalidated in the meantime.
+ * This class represents an update in the future at [target] requested by [ctx].
*/
- class Timer(@JvmField val ctx: SimResourceContextImpl, @JvmField val target: Long) : Comparable<Timer> {
+ class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> {
override fun compareTo(other: Timer): Int {
return target.compareTo(other.target)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 3c25b76d..17b82391 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -20,45 +20,48 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.mux
-import org.opendc.simulator.resources.interference.InterferenceKey
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowCounters
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.interference.InterferenceKey
/**
- * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
+ * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
*/
-public interface SimResourceSwitch {
+public interface FlowMultiplexer {
/**
- * The output resource providers to which resource consumers can be attached.
+ * The inputs of the multiplexer that can be used to consume sources.
*/
- public val outputs: Set<SimResourceProvider>
+ public val inputs: Set<FlowConsumer>
/**
- * The input resources that will be switched between the output providers.
+ * The outputs of the multiplexer over which the flows will be distributed.
*/
- public val inputs: Set<SimResourceProvider>
+ public val outputs: Set<FlowConsumer>
/**
- * The resource counters to track the execution metrics of all switch resources.
+ * The flow counters to track the flow metrics of all multiplexer inputs.
*/
- public val counters: SimResourceCounters
+ public val counters: FlowCounters
/**
- * Create a new output on the switch.
+ * Create a new input on this multiplexer.
*
- * @param key The key of the interference member to which the output belongs.
+ * @param key The key of the interference member to which the input belongs.
*/
- public fun newOutput(key: InterferenceKey? = null): SimResourceProvider
+ public fun newInput(key: InterferenceKey? = null): FlowConsumer
/**
- * Remove [output] from this switch.
+ * Remove [input] from this multiplexer.
*/
- public fun removeOutput(output: SimResourceProvider)
+ public fun removeInput(input: FlowConsumer)
/**
- * Add the specified [input] to the switch.
+ * Add the specified [output] to the multiplexer.
*/
- public fun addInput(input: SimResourceProvider)
+ public fun addOutput(output: FlowConsumer)
/**
* Clear all inputs and outputs from the switch.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
new file mode 100644
index 00000000..811d9460
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceKey
+import java.util.ArrayDeque
+
+/**
+ * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
+ * that a single input is directly connected to an output and that the multiplexer can only support as many
+ * inputs as outputs.
+ *
+ * @param engine The [FlowEngine] driving the simulation.
+ */
+public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer {
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+
+ override val outputs: Set<FlowConsumer>
+ get() = _outputs
+ private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _availableOutputs = ArrayDeque<FlowForwarder>()
+
+ override val counters: FlowCounters = object : FlowCounters {
+ override val demand: Double
+ get() = _outputs.sumOf { it.counters.demand }
+ override val actual: Double
+ get() = _outputs.sumOf { it.counters.actual }
+ override val overcommit: Double
+ get() = _outputs.sumOf { it.counters.overcommit }
+ override val interference: Double
+ get() = _outputs.sumOf { it.counters.interference }
+
+ override fun reset() {
+ for (input in _outputs) {
+ input.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val output = Input(forwarder)
+ _inputs += output
+ return output
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+
+ (input as Input).close()
+ }
+
+ override fun addOutput(output: FlowConsumer) {
+ if (output in outputs) {
+ return
+ }
+
+ val forwarder = FlowForwarder(engine)
+
+ _outputs += output
+ _availableOutputs += forwarder
+
+ output.startConsumer(object : FlowSource by forwarder {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ if (event == FlowEvent.Exit) {
+ // De-register the output after it has finished
+ _outputs -= output
+ }
+
+ forwarder.onEvent(conn, now, event)
+ }
+ })
+ }
+
+ override fun clear() {
+ for (input in _outputs) {
+ input.cancel()
+ }
+ _outputs.clear()
+
+ // Inputs are implicitly cancelled by the output forwarders
+ _inputs.clear()
+ }
+
+ /**
+ * An input on the multiplexer.
+ */
+ private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder {
+ /**
+ * Close the input.
+ */
+ fun close() {
+ // We explicitly do not close the forwarder here in order to re-use it across input resources.
+ _inputs -= this
+ _availableOutputs += forwarder
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
new file mode 100644
index 00000000..9735f121
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -0,0 +1,399 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceDomain
+import org.opendc.simulator.flow.interference.InterferenceKey
+import org.opendc.simulator.flow.internal.FlowCountersImpl
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing.
+ *
+ * @param engine The [FlowEngine] to drive the flow simulation.
+ * @param parent The parent flow system of the multiplexer.
+ * @param interferenceDomain The interference domain of the multiplexer.
+ */
+public class MaxMinFlowMultiplexer(
+ private val engine: FlowEngine,
+ private val parent: FlowSystem? = null,
+ private val interferenceDomain: InterferenceDomain? = null
+) : FlowMultiplexer {
+ /**
+ * The inputs of the multiplexer.
+ */
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+ private val _activeInputs = mutableListOf<Input>()
+
+ /**
+ * The outputs of the multiplexer.
+ */
+ override val outputs: Set<FlowConsumer>
+ get() = _outputs
+ private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _activeOutputs = mutableListOf<Output>()
+
+ /**
+ * The flow counters of this multiplexer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = FlowCountersImpl()
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ private var _rate = 0.0
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ private var _demand = 0.0
+
+ /**
+ * The capacity of the outputs.
+ */
+ private var _capacity = 0.0
+
+ /**
+ * Flag to indicate that the scheduler is active.
+ */
+ private var _schedulerActive = false
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val provider = Input(_capacity, key)
+ _inputs.add(provider)
+ return provider
+ }
+
+ override fun addOutput(output: FlowConsumer) {
+ val consumer = Output(output)
+ if (_outputs.add(output)) {
+ _activeOutputs.add(consumer)
+ output.startConsumer(consumer)
+ }
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+ // This cast should always succeed since only `Input` instances should be added to `_inputs`
+ (input as Input).close()
+ }
+
+ override fun clear() {
+ for (input in _activeOutputs) {
+ input.cancel()
+ }
+ _activeOutputs.clear()
+
+ for (output in _activeInputs) {
+ output.cancel()
+ }
+ _activeInputs.clear()
+ }
+
+ /**
+ * Converge the scheduler of the multiplexer.
+ */
+ private fun runScheduler(now: Long) {
+ if (_schedulerActive) {
+ return
+ }
+
+ _schedulerActive = true
+ try {
+ doSchedule(now)
+ } finally {
+ _schedulerActive = false
+ }
+ }
+
+ /**
+ * Schedule the inputs over the outputs.
+ */
+ private fun doSchedule(now: Long) {
+ val activeInputs = _activeInputs
+ val activeOutputs = _activeOutputs
+
+ // If there is no work yet, mark the inputs as idle.
+ if (activeInputs.isEmpty()) {
+ return
+ }
+
+ val capacity = _capacity
+ var availableCapacity = capacity
+
+ // Pull in the work of the outputs
+ val inputIterator = activeInputs.listIterator()
+ for (input in inputIterator) {
+ input.pull(now)
+
+ // Remove outputs that have finished
+ if (!input.isActive) {
+ inputIterator.remove()
+ }
+ }
+
+ var demand = 0.0
+
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeInputs.sort()
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ var remaining = activeInputs.size
+ for (input in activeInputs) {
+ val availableShare = availableCapacity / remaining--
+ val grantedRate = min(input.allowedRate, availableShare)
+
+ // Ignore empty sources
+ if (grantedRate <= 0.0) {
+ input.actualRate = 0.0
+ continue
+ }
+
+ input.actualRate = grantedRate
+ demand += input.limit
+ availableCapacity -= grantedRate
+ }
+
+ val rate = capacity - availableCapacity
+
+ _demand = demand
+ _rate = rate
+
+ // Sort all consumers by their capacity
+ activeOutputs.sort()
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (output in activeOutputs) {
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
+ }
+
+ /**
+ * Recompute the capacity of the multiplexer.
+ */
+ private fun updateCapacity() {
+ val newCapacity = _activeOutputs.sumOf(Output::capacity)
+
+ // No-op if the capacity is unchanged
+ if (_capacity == newCapacity) {
+ return
+ }
+
+ _capacity = newCapacity
+
+ for (input in _inputs) {
+ input.capacity = newCapacity
+ }
+ }
+
+ /**
+ * An internal [FlowConsumer] implementation for multiplexer inputs.
+ */
+ private inner class Input(capacity: Double, val key: InterferenceKey?) :
+ AbstractFlowConsumer(engine, capacity),
+ FlowConsumerLogic,
+ Comparable<Input> {
+ /**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing rate that is allowed by the model constraints.
+ */
+ val allowedRate: Double
+ get() = min(capacity, limit)
+
+ /**
+ * A flag to indicate that the input is closed.
+ */
+ private var _isClosed: Boolean = false
+
+ /**
+ * The timestamp at which we received the last command.
+ */
+ private var _lastPull: Long = Long.MIN_VALUE
+
+ /**
+ * Close the input.
+ *
+ * This method is invoked when the user removes an input from the switch.
+ */
+ fun close() {
+ _isClosed = true
+ cancel()
+ }
+
+ /* AbstractFlowConsumer */
+ override fun createLogic(): FlowConsumerLogic = this
+
+ override fun start(ctx: FlowConsumerContext) {
+ check(!_isClosed) { "Cannot re-use closed input" }
+
+ _activeInputs += this
+ super.start(ctx)
+ }
+
+ /* FlowConsumerLogic */
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ doUpdateCounters(delta)
+
+ actualRate = 0.0
+ this.limit = rate
+ _lastPull = now
+
+ runScheduler(now)
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ doUpdateCounters(delta)
+
+ limit = 0.0
+ actualRate = 0.0
+ _lastPull = now
+ }
+
+ /* Comparable */
+ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
+
+ /**
+ * Pull the source if necessary.
+ */
+ fun pull(now: Long) {
+ val ctx = ctx
+ if (ctx != null && _lastPull < now) {
+ ctx.flush()
+ }
+ }
+
+ /**
+ * Helper method to update the flow counters of the multiplexer.
+ */
+ private fun doUpdateCounters(delta: Long) {
+ if (delta <= 0L) {
+ return
+ }
+
+ // Compute the performance penalty due to flow interference
+ val perfScore = if (interferenceDomain != null) {
+ val load = _rate / capacity
+ interferenceDomain.apply(key, load)
+ } else {
+ 1.0
+ }
+
+ val deltaS = delta / 1000.0
+ val work = limit * deltaS
+ val actualWork = actualRate * deltaS
+ val remainingWork = work - actualWork
+
+ updateCounters(work, actualWork, remainingWork)
+
+ val distCounters = _counters
+ distCounters.demand += work
+ distCounters.actual += actualWork
+ distCounters.overcommit += remainingWork
+ distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ }
+ }
+
+ /**
+ * An internal [FlowSource] implementation for multiplexer outputs.
+ */
+ private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> {
+ /**
+ * The active [FlowConnection] of this source.
+ */
+ private var _ctx: FlowConnection? = null
+
+ /**
+ * The capacity of this output.
+ */
+ val capacity: Double
+ get() = _ctx?.capacity ?: 0.0
+
+ /**
+ * Push the specified rate to the consumer.
+ */
+ fun push(rate: Double) {
+ _ctx?.push(rate)
+ }
+
+ /**
+ * Cancel this output.
+ */
+ fun cancel() {
+ provider.cancel()
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ runScheduler(now)
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ when (event) {
+ FlowEvent.Start -> {
+ assert(_ctx == null) { "Source running concurrently" }
+ _ctx = conn
+ updateCapacity()
+ }
+ FlowEvent.Exit -> {
+ _ctx = null
+ updateCapacity()
+ }
+ FlowEvent.Capacity -> updateCapacity()
+ else -> {}
+ }
+ }
+
+ override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
index bf76711f..d9779c6a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
@@ -20,41 +20,37 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
import kotlin.math.roundToLong
/**
- * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
+ * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization].
*/
-public class SimWorkConsumer(
- private val work: Double,
- private val utilization: Double
-) : SimResourceConsumer {
+public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource {
init {
- require(work >= 0.0) { "Work must be positive" }
+ require(amount >= 0.0) { "Amount must be positive" }
require(utilization > 0.0) { "Utilization must be positive" }
}
- private var remainingWork = work
+ private var remainingAmount = amount
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val actualWork = ctx.speed * delta / 1000.0
- val limit = ctx.capacity * utilization
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val consumed = conn.rate * delta / 1000.0
+ val limit = conn.capacity * utilization
- remainingWork -= actualWork
+ remainingAmount -= consumed
- val remainingWork = remainingWork
- val duration = (remainingWork / limit * 1000).roundToLong()
+ val duration = (remainingAmount / limit * 1000).roundToLong()
return if (duration > 0) {
- ctx.push(limit)
+ conn.push(limit)
duration
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
index 52a42241..b3191ad3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
@@ -20,13 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
/**
- * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to
- * complete, before proceeding its operation.
+ * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to
+ * finish a pull, before proceeding its operation.
*/
-public class SimConsumerBarrier(public val parties: Int) {
+public class FlowSourceBarrier(public val parties: Int) {
private var counter = 0
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 46885640..7fcc0405 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -20,20 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
import kotlin.math.min
/**
* Helper class to expose an observable [speed] field describing the speed of the consumer.
*/
-public class SimSpeedConsumerAdapter(
- private val delegate: SimResourceConsumer,
+public class FlowSourceRateAdapter(
+ private val delegate: FlowSource,
private val callback: (Double) -> Unit = {}
-) : SimResourceConsumer by delegate {
+) : FlowSource by delegate {
/**
* The resource processing speed at this instant.
*/
@@ -49,34 +49,34 @@ public class SimSpeedConsumerAdapter(
callback(0.0)
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return delegate.onNext(ctx, now, delta)
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
val oldSpeed = speed
- delegate.onEvent(ctx, event)
+ delegate.onEvent(conn, now, event)
when (event) {
- SimResourceEvent.Run -> speed = ctx.speed
- SimResourceEvent.Capacity -> {
+ FlowEvent.Converge -> speed = conn.rate
+ FlowEvent.Capacity -> {
// Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
// need to update the current speed.
if (oldSpeed == speed) {
- speed = min(ctx.capacity, speed)
+ speed = min(conn.capacity, speed)
}
}
- SimResourceEvent.Exit -> speed = 0.0
+ FlowEvent.Exit -> speed = 0.0
else -> {}
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
speed = 0.0
- delegate.onFailure(ctx, cause)
+ delegate.onFailure(conn, cause)
}
- override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
+ override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
index 4c0e075c..4d3ae61a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -20,21 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
/**
- * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
- * consumption for some period of time.
+ * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time.
*/
-public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
+public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource {
private var _iterator: Iterator<Fragment>? = null
private var _nextTarget = Long.MIN_VALUE
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
// Check whether the trace fragment was fully consumed, otherwise wait until we have done so
val nextTarget = _nextTarget
if (nextTarget > now) {
@@ -45,21 +44,21 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
return if (iterator.hasNext()) {
val fragment = iterator.next()
_nextTarget = now + fragment.duration
- ctx.push(fragment.usage)
+ conn.push(fragment.usage)
fragment.duration
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> {
- check(_iterator == null) { "Consumer already running" }
+ FlowEvent.Start -> {
+ check(_iterator == null) { "Source already running" }
_iterator = trace.iterator()
}
- SimResourceEvent.Exit -> {
+ FlowEvent.Exit -> {
_iterator = null
}
else -> {}
@@ -67,7 +66,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
}
/**
- * A fragment of the workload.
+ * A fragment of the tgrace.
*/
public data class Fragment(val duration: Long, val usage: Double)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
new file mode 100644
index 00000000..061ebea6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+
+/**
+ * A test suite for the [FlowConsumerContextImpl] class.
+ */
+class FlowConsumerContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ engine.scheduleSync(engine.clock.millis(), context)
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = FixedFlowSource(1.0, 1.0)
+
+ val logic = spyk(object : FlowConsumerLogic {})
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 1.0
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ engine.scheduleSync(engine.clock.millis(), context)
+
+ verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) }
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(0.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ assertThrows<IllegalStateException> {
+ context.start()
+ }
+ }
+
+ @Test
+ fun testIdempotentCapacityChange() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 4200.0
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ }
+
+ @Test
+ fun testFailureNoInfiniteLoop() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ if (event == FlowEvent.Exit) throw IllegalStateException("onEvent")
+ }
+
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
+ throw IllegalStateException("onFailure")
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ delay(1)
+
+ verify(exactly = 1) { consumer.onFailure(any(), any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index 49e60f68..cbc48a4e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import io.mockk.spyk
import io.mockk.verify
@@ -29,24 +29,24 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
- * A test suite for the [SimResourceForwarder] class.
+ * A test suite for the [FlowForwarder] class.
*/
-internal class SimResourceForwarderTest {
+internal class FlowForwarderTest {
@Test
fun testCancelImmediately() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
launch { source.consume(forwarder) }
- forwarder.consume(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
})
@@ -57,22 +57,22 @@ internal class SimResourceForwarderTest {
@Test
fun testCancel() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
launch { source.consume(forwarder) }
- forwarder.consume(object : SimResourceConsumer {
+ forwarder.consume(object : FlowSource {
var isFirst = true
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
10 * 1000
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
@@ -84,10 +84,11 @@ internal class SimResourceForwarderTest {
@Test
fun testState() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
}
@@ -108,11 +109,12 @@ internal class SimResourceForwarderTest {
@Test
fun testCancelPendingDelegate() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
- val consumer = spyk(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
})
@@ -120,16 +122,16 @@ internal class SimResourceForwarderTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testCancelStartedDelegate() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
- val consumer = spyk(SimWorkConsumer(2000.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
source.startConsumer(forwarder)
yield()
@@ -137,17 +139,17 @@ internal class SimResourceForwarderTest {
yield()
forwarder.cancel()
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testCancelPropagation() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
- val consumer = spyk(SimWorkConsumer(2000.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
source.startConsumer(forwarder)
yield()
@@ -155,19 +157,19 @@ internal class SimResourceForwarderTest {
yield()
source.cancel()
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testExitPropagation() = runBlockingSimulation {
- val forwarder = SimResourceForwarder(isCoupled = true)
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
}
@@ -181,11 +183,11 @@ internal class SimResourceForwarderTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(1.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
- val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
source.startConsumer(forwarder)
coroutineScope {
@@ -195,16 +197,16 @@ internal class SimResourceForwarderTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
}
@Test
fun testCounters() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(1.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
- val consumer = SimWorkConsumer(2.0, 1.0)
+ val consumer = FixedFlowSource(2.0, 1.0)
source.startConsumer(forwarder)
forwarder.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
index e055daf7..010a985e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import io.mockk.every
import io.mockk.mockk
@@ -30,24 +30,24 @@ import kotlinx.coroutines.*
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
/**
- * A test suite for the [SimResourceSource] class.
+ * A test suite for the [FlowSink] class.
*/
-internal class SimResourceSourceTest {
+internal class FlowSinkTest {
@Test
fun testSpeed() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(4200.0, 1.0)
+ val consumer = FixedFlowSource(4200.0, 1.0)
val res = mutableListOf<Double>()
- val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
provider.consume(adapter)
@@ -56,10 +56,10 @@ internal class SimResourceSourceTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val provider = SimResourceSource(1.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(engine, 1.0)
- val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
coroutineScope {
launch { provider.consume(consumer) }
@@ -67,19 +67,19 @@ internal class SimResourceSourceTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
}
@Test
fun testSpeedLimit() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(capacity, 2.0)
+ val consumer = FixedFlowSource(capacity, 2.0)
val res = mutableListOf<Double>()
- val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
provider.consume(adapter)
@@ -87,23 +87,23 @@ internal class SimResourceSourceTest {
}
/**
- * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or
- * [SimResourceConsumer.onNext].
+ * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
+ * [FlowSource.onPull].
*/
@Test
fun testIntermediateInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- ctx.interrupt()
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ conn.pull()
}
}
@@ -112,28 +112,28 @@ internal class SimResourceSourceTest {
@Test
fun testInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
- lateinit var resCtx: SimResourceContext
+ val provider = FlowSink(engine, capacity)
+ lateinit var resCtx: FlowConnection
- val consumer = object : SimResourceConsumer {
+ val consumer = object : FlowSource {
var isFirst = true
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> resCtx = ctx
+ FlowEvent.Start -> resCtx = conn
else -> {}
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
4000
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
@@ -141,7 +141,7 @@ internal class SimResourceSourceTest {
launch {
yield()
- resCtx.interrupt()
+ resCtx.pull()
}
provider.consume(consumer)
@@ -150,12 +150,12 @@ internal class SimResourceSourceTest {
@Test
fun testFailure() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) }
+ val consumer = mockk<FlowSource>(relaxUnitFun = true)
+ every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) }
.throws(IllegalStateException())
assertThrows<IllegalStateException> {
@@ -165,17 +165,17 @@ internal class SimResourceSourceTest {
@Test
fun testExceptionPropagationOnNext() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = object : SimResourceConsumer {
+ val consumer = object : FlowSource {
var isFirst = true
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
1000
} else {
throw IllegalStateException()
@@ -190,11 +190,11 @@ internal class SimResourceSourceTest {
@Test
fun testConcurrentConsumption() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(capacity, 1.0)
+ val consumer = FixedFlowSource(capacity, 1.0)
assertThrows<IllegalStateException> {
coroutineScope {
@@ -206,11 +206,11 @@ internal class SimResourceSourceTest {
@Test
fun testCancelDuringConsumption() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(capacity, 1.0)
+ val consumer = FixedFlowSource(capacity, 1.0)
launch { provider.consume(consumer) }
delay(500)
@@ -225,12 +225,12 @@ internal class SimResourceSourceTest {
fun testInfiniteSleep() {
assertThrows<IllegalStateException> {
runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
}
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt
index 49f2da5f..b503087e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.mux
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
@@ -28,13 +28,14 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+import org.opendc.simulator.flow.source.TraceFlowSource
/**
- * Test suite for the [SimResourceSwitchExclusive] class.
+ * Test suite for the [ForwardingFlowMultiplexer] class.
*/
internal class SimResourceSwitchExclusiveTest {
/**
@@ -42,29 +43,29 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val speed = mutableListOf<Double>()
val duration = 5 * 60L
val workload =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3500.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
),
)
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
- val forwarder = SimResourceForwarder()
- val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+ val forwarder = FlowForwarder(engine)
+ val adapter = FlowSourceRateAdapter(forwarder, speed::add)
source.startConsumer(adapter)
- switch.addInput(forwarder)
+ switch.addOutput(forwarder)
- val provider = switch.newOutput()
+ val provider = switch.newInput()
provider.consume(workload)
yield()
@@ -79,17 +80,17 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testRuntimeWorkload() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = SimWorkConsumer(duration * 3.2, 1.0)
+ val workload = FixedFlowSource(duration * 3.2, 1.0)
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
- switch.addInput(source)
+ switch.addOutput(source)
- val provider = switch.newOutput()
+ val provider = switch.newInput()
provider.consume(workload)
yield()
@@ -101,37 +102,37 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTwoWorkloads() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer {
+ val workload = object : FlowSource {
var isFirst = true
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> isFirst = true
+ FlowEvent.Start -> isFirst = true
else -> {}
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
duration
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
}
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
- switch.addInput(source)
+ switch.addOutput(source)
- val provider = switch.newOutput()
+ val provider = switch.newInput()
provider.consume(workload)
yield()
provider.consume(workload)
@@ -143,14 +144,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
- switch.addInput(source)
+ switch.addOutput(source)
- switch.newOutput()
- assertThrows<IllegalStateException> { switch.newOutput() }
+ switch.newInput()
+ assertThrows<IllegalStateException> { switch.newInput() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt
index 03f90e21..089a8d78 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.mux
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -28,24 +28,26 @@ import kotlinx.coroutines.yield
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.TraceFlowSource
/**
- * Test suite for the [SimResourceSwitch] implementations
+ * Test suite for the [FlowMultiplexer] implementations
*/
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin(scheduler)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val switch = MaxMinFlowMultiplexer(scheduler)
- val sources = List(2) { SimResourceSource(2000.0, scheduler) }
- sources.forEach { switch.addInput(it) }
+ val sources = List(2) { FlowSink(scheduler, 2000.0) }
+ sources.forEach { switch.addOutput(it) }
- val provider = switch.newOutput()
- val consumer = SimWorkConsumer(2000.0, 1.0)
+ val provider = switch.newInput()
+ val consumer = FixedFlowSource(2000.0, 1.0)
try {
provider.consume(consumer)
@@ -60,24 +62,24 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L
val workload =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3500.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
),
)
- val switch = SimResourceSwitchMaxMin(scheduler)
- val provider = switch.newOutput()
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val provider = switch.newInput()
try {
- switch.addInput(SimResourceSource(3200.0, scheduler))
+ switch.addOutput(FlowSink(scheduler, 3200.0))
provider.consume(workload)
yield()
} finally {
@@ -97,34 +99,34 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L
val workloadA =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3500.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
),
)
val workloadB =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3100.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 73.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3100.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 73.0)
)
)
- val switch = SimResourceSwitchMaxMin(scheduler)
- val providerA = switch.newOutput()
- val providerB = switch.newOutput()
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val providerA = switch.newInput()
+ val providerB = switch.newInput()
try {
- switch.addInput(SimResourceSource(3200.0, scheduler))
+ switch.addOutput(FlowSink(scheduler, 3200.0))
coroutineScope {
launch { providerA.consume(workloadA) }
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
index 830f16d3..8396d346 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
@@ -20,24 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.source
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
/**
- * A test suite for the [SimWorkConsumer] class.
+ * A test suite for the [FixedFlowSource] class.
*/
-internal class SimWorkConsumerTest {
+internal class FixedFlowSourceTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val provider = SimResourceSource(1.0, scheduler)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
- val consumer = SimWorkConsumer(1.0, 1.0)
+ val consumer = FixedFlowSource(1.0, 1.0)
provider.consume(consumer)
assertEquals(1000, clock.millis())
@@ -45,10 +46,10 @@ internal class SimWorkConsumerTest {
@Test
fun testUtilization() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val provider = SimResourceSource(1.0, scheduler)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
- val consumer = SimWorkConsumer(1.0, 0.5)
+ val consumer = FixedFlowSource(1.0, 0.5)
provider.consume(consumer)
assertEquals(2000, clock.millis())
diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts
index eb9adcd1..a8f94602 100644
--- a/opendc-simulator/opendc-simulator-network/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts
@@ -30,6 +30,6 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
- api(projects.opendcSimulator.opendcSimulatorResources)
+ api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt
index 102e5625..4b66d5cf 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.network
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowSource
/**
* A network port allows network devices to be connected to network through links.
@@ -78,14 +78,14 @@ public abstract class SimNetworkPort {
}
/**
- * Create a [SimResourceConsumer] which generates the outgoing traffic of this port.
+ * Create a [FlowSource] which generates the outgoing traffic of this port.
*/
- protected abstract fun createConsumer(): SimResourceConsumer
+ protected abstract fun createConsumer(): FlowSource
/**
- * The [SimResourceProvider] which processes the ingoing traffic of this port.
+ * The [FlowConsumer] which processes the ingoing traffic of this port.
*/
- protected abstract val provider: SimResourceProvider
+ protected abstract val provider: FlowConsumer
override fun toString(): String = "SimNetworkPort[isConnected=$isConnected]"
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
index 7db0f176..4b0d7bbd 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
@@ -22,22 +22,22 @@
package org.opendc.simulator.network
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
/**
* A network sink which discards all received traffic and does not generate any traffic itself.
*/
public class SimNetworkSink(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
public val capacity: Double
) : SimNetworkPort() {
- override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE
+ override fun createConsumer(): FlowSource = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
override fun toString(): String = "SimNetworkSink.Consumer"
}
- override val provider: SimResourceProvider = SimResourceSource(capacity, interpreter)
+ override val provider: FlowConsumer = FlowSink(engine, capacity)
override fun toString(): String = "SimNetworkSink[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
index 2267715e..2b7c1ad7 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
@@ -22,12 +22,13 @@
package org.opendc.simulator.network
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A [SimNetworkSwitch] that can support new networking ports on demand.
*/
-public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimNetworkSwitch {
+public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetworkSwitch {
/**
* The ports of this switch.
*/
@@ -36,9 +37,9 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN
private val _ports = mutableListOf<Port>()
/**
- * The [SimResourceSwitchMaxMin] to actually perform the switching.
+ * The [MaxMinFlowMultiplexer] to actually perform the switching.
*/
- private val switch = SimResourceSwitchMaxMin(interpreter)
+ private val mux = MaxMinFlowMultiplexer(engine)
/**
* Open a new port on the switch.
@@ -58,19 +59,19 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN
*/
private var isClosed: Boolean = false
- override val provider: SimResourceProvider
+ override val provider: FlowConsumer
get() = _provider
- private val _provider = switch.newOutput()
+ private val _provider = mux.newInput()
- override fun createConsumer(): SimResourceConsumer {
- val forwarder = SimResourceForwarder(isCoupled = true)
- switch.addInput(forwarder)
+ override fun createConsumer(): FlowSource {
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ mux.addOutput(forwarder)
return forwarder
}
override fun close() {
isClosed = true
- switch.removeOutput(_provider)
+ mux.removeInput(_provider)
_ports.remove(this)
}
}
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index b8c4b00d..45d0bcf0 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -31,8 +31,8 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimNetworkSink] class.
@@ -40,8 +40,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
class SimNetworkSinkTest {
@Test
fun testInitialState() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
assertFalse(sink.isConnected)
assertNull(sink.link)
@@ -50,8 +50,8 @@ class SimNetworkSinkTest {
@Test
fun testDisconnectIdempotent() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
assertDoesNotThrow { sink.disconnect() }
assertFalse(sink.isConnected)
@@ -59,8 +59,8 @@ class SimNetworkSinkTest {
@Test
fun testConnectCircular() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
assertThrows<IllegalArgumentException> {
sink.connect(sink)
@@ -69,8 +69,8 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnectedTarget() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
val source = mockk<SimNetworkPort>(relaxUnitFun = true)
every { source.isConnected } returns true
@@ -81,9 +81,9 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnected() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source1 = Source(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source1 = Source(engine)
val source2 = mockk<SimNetworkPort>(relaxUnitFun = true)
@@ -97,9 +97,9 @@ class SimNetworkSinkTest {
@Test
fun testConnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source = spyk(Source(interpreter))
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source = spyk(Source(engine))
val consumer = source.consumer
sink.connect(source)
@@ -108,14 +108,14 @@ class SimNetworkSinkTest {
assertTrue(source.isConnected)
verify { source.createConsumer() }
- verify { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Start) }
}
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source = spyk(Source(interpreter))
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source = spyk(Source(engine))
val consumer = source.consumer
sink.connect(source)
@@ -124,14 +124,14 @@ class SimNetworkSinkTest {
assertFalse(sink.isConnected)
assertFalse(source.isConnected)
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
- private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() {
- val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8))
+ private class Source(engine: FlowEngine) : SimNetworkPort() {
+ val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8))
- public override fun createConsumer(): SimResourceConsumer = consumer
+ public override fun createConsumer(): FlowSource = consumer
- override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter)
+ override val provider: FlowConsumer = FlowSink(engine, 0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
index 3a749bfe..4aa2fa92 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
@@ -28,8 +28,8 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimNetworkSwitchVirtual] class.
@@ -37,10 +37,10 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
class SimNetworkSwitchVirtualTest {
@Test
fun testConnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source = spyk(Source(interpreter))
- val switch = SimNetworkSwitchVirtual(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source = spyk(Source(engine))
+ val switch = SimNetworkSwitchVirtual(engine)
val consumer = source.consumer
switch.newPort().connect(sink)
@@ -50,14 +50,14 @@ class SimNetworkSwitchVirtualTest {
assertTrue(source.isConnected)
verify { source.createConsumer() }
- verify { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Start) }
}
@Test
fun testConnectClosedPort() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val switch = SimNetworkSwitchVirtual(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val switch = SimNetworkSwitchVirtual(engine)
val port = switch.newPort()
port.close()
@@ -67,11 +67,11 @@ class SimNetworkSwitchVirtualTest {
}
}
- private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() {
- val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8))
+ private class Source(engine: FlowEngine) : SimNetworkPort() {
+ val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8))
- public override fun createConsumer(): SimResourceConsumer = consumer
+ public override fun createConsumer(): FlowSource = consumer
- override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter)
+ override val provider: FlowConsumer = FlowSink(engine, 0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts
index f2a49964..e4342a6a 100644
--- a/opendc-simulator/opendc-simulator-power/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts
@@ -30,6 +30,6 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
- api(projects.opendcSimulator.opendcSimulatorResources)
+ api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index 1a12a52a..c33f5186 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -22,46 +22,48 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.FlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A model of a Power Distribution Unit (PDU).
*
- * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood.
+ * @param engine The underlying [FlowEngine] to drive the simulation under the hood.
* @param idlePower The idle power consumption of the PDU independent of the load on the PDU.
* @param lossCoefficient The coefficient for the power loss of the PDU proportional to the square load.
*/
public class SimPdu(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
private val idlePower: Double = 0.0,
private val lossCoefficient: Double = 0.0,
) : SimPowerInlet() {
/**
- * The [SimResourceSwitch] that distributes the electricity over the PDU outlets.
+ * The [FlowMultiplexer] that distributes the electricity over the PDU outlets.
*/
- private val switch = SimResourceSwitchMaxMin(interpreter)
+ private val mux = MaxMinFlowMultiplexer(engine)
/**
- * The [SimResourceForwarder] that represents the input of the PDU.
+ * The [FlowForwarder] that represents the input of the PDU.
*/
- private val forwarder = SimResourceForwarder()
+ private val forwarder = FlowForwarder(engine)
/**
* Create a new PDU outlet.
*/
- public fun newOutlet(): Outlet = Outlet(switch, switch.newOutput())
+ public fun newOutlet(): Outlet = Outlet(mux, mux.newInput())
init {
- switch.addInput(forwarder)
+ mux.addOutput(forwarder)
}
- override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by forwarder {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val duration = forwarder.onNext(ctx, now, delta)
- val loss = computePowerLoss(ctx.demand)
- val newLimit = ctx.demand + loss
+ override fun createConsumer(): FlowSource = object : FlowSource by forwarder {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val duration = forwarder.onPull(conn, now, delta)
+ val loss = computePowerLoss(conn.demand)
+ val newLimit = conn.demand + loss
- ctx.push(newLimit)
+ conn.push(newLimit)
return duration
}
@@ -81,7 +83,7 @@ public class SimPdu(
/**
* A PDU outlet.
*/
- public class Outlet(private val switch: SimResourceSwitch, private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable {
+ public class Outlet(private val switch: FlowMultiplexer, private val provider: FlowConsumer) : SimPowerOutlet(), AutoCloseable {
override fun onConnect(inlet: SimPowerInlet) {
provider.startConsumer(inlet.createConsumer())
}
@@ -94,7 +96,7 @@ public class SimPdu(
* Remove the outlet from the PDU.
*/
override fun close() {
- switch.removeOutput(provider)
+ switch.removeInput(provider)
}
override fun toString(): String = "SimPdu.Outlet"
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt
index 0ac1f199..851b28a5 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.flow.FlowSource
/**
* An abstract inlet that consumes electricity from a power outlet.
@@ -42,7 +42,7 @@ public abstract class SimPowerInlet {
internal var _outlet: SimPowerOutlet? = null
/**
- * Create a [SimResourceConsumer] which represents the consumption of electricity from the power outlet.
+ * Create a [FlowSource] which represents the consumption of electricity from the power outlet.
*/
- public abstract fun createConsumer(): SimResourceConsumer
+ public abstract fun createConsumer(): FlowSource
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt
index 3ef8ccc6..7faebd75 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt
@@ -22,25 +22,25 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSource
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSink
/**
* A [SimPowerOutlet] that represents a source of electricity.
*
- * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood.
+ * @param engine The underlying [FlowEngine] to drive the simulation under the hood.
*/
-public class SimPowerSource(interpreter: SimResourceInterpreter, public val capacity: Double) : SimPowerOutlet() {
+public class SimPowerSource(engine: FlowEngine, public val capacity: Double) : SimPowerOutlet() {
/**
* The resource source that drives this power source.
*/
- private val source = SimResourceSource(capacity, interpreter)
+ private val source = FlowSink(engine, capacity)
/**
* The power draw at this instant.
*/
public val powerDraw: Double
- get() = source.speed
+ get() = source.rate
override fun onConnect(inlet: SimPowerInlet) {
source.startConsumer(inlet.createConsumer())
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
index 9c7400ed..5eaa91af 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
@@ -22,50 +22,51 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A model of an Uninterruptible Power Supply (UPS).
*
* This model aggregates multiple power sources into a single source in order to ensure that power is always available.
*
- * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood.
+ * @param engine The underlying [FlowEngine] to drive the simulation under the hood.
* @param idlePower The idle power consumption of the UPS independent of the load.
* @param lossCoefficient The coefficient for the power loss of the UPS proportional to the load.
*/
public class SimUps(
- interpreter: SimResourceInterpreter,
+ private val engine: FlowEngine,
private val idlePower: Double = 0.0,
private val lossCoefficient: Double = 0.0,
) : SimPowerOutlet() {
/**
* The resource aggregator used to combine the input sources.
*/
- private val switch = SimResourceSwitchMaxMin(interpreter)
+ private val switch = MaxMinFlowMultiplexer(engine)
/**
- * The [SimResourceProvider] that represents the output of the UPS.
+ * The [FlowConsumer] that represents the output of the UPS.
*/
- private val provider = switch.newOutput()
+ private val provider = switch.newInput()
/**
* Create a new UPS outlet.
*/
public fun newInlet(): SimPowerInlet {
- val forward = SimResourceForwarder(isCoupled = true)
- switch.addInput(forward)
+ val forward = FlowForwarder(engine, isCoupled = true)
+ switch.addOutput(forward)
return Inlet(forward)
}
override fun onConnect(inlet: SimPowerInlet) {
val consumer = inlet.createConsumer()
- provider.startConsumer(object : SimResourceConsumer by consumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val duration = consumer.onNext(ctx, now, delta)
- val loss = computePowerLoss(ctx.demand)
- val newLimit = ctx.demand + loss
+ provider.startConsumer(object : FlowSource by consumer {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val duration = consumer.onPull(conn, now, delta)
+ val loss = computePowerLoss(conn.demand)
+ val newLimit = conn.demand + loss
- ctx.push(newLimit)
+ conn.push(newLimit)
return duration
}
})
@@ -86,8 +87,8 @@ public class SimUps(
/**
* A UPS inlet.
*/
- public inner class Inlet(private val forwarder: SimResourceForwarder) : SimPowerInlet(), AutoCloseable {
- override fun createConsumer(): SimResourceConsumer = forwarder
+ public inner class Inlet(private val forwarder: FlowForwarder) : SimPowerInlet(), AutoCloseable {
+ override fun createConsumer(): FlowSource = forwarder
/**
* Remove the inlet from the PSU.
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index 17a174b7..568a1e8c 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceEvent
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimPdu] class.
@@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimPduTest {
@Test
fun testZeroOutlets() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
assertEquals(0.0, source.powerDraw)
@@ -49,9 +49,9 @@ internal class SimPduTest {
@Test
fun testSingleOutlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
pdu.newOutlet().connect(SimpleInlet())
@@ -60,9 +60,9 @@ internal class SimPduTest {
@Test
fun testDoubleOutlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
pdu.newOutlet().connect(SimpleInlet())
@@ -73,28 +73,28 @@ internal class SimPduTest {
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
- val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0))
+ val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0))
val inlet = object : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = consumer
+ override fun createConsumer(): FlowSource = consumer
}
val outlet = pdu.newOutlet()
outlet.connect(inlet)
outlet.disconnect()
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testLoss() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
- val pdu = SimPdu(interpreter, idlePower = 1.5, lossCoefficient = 0.015)
+ val pdu = SimPdu(engine, idlePower = 1.5, lossCoefficient = 0.015)
source.connect(pdu)
pdu.newOutlet().connect(SimpleInlet())
assertEquals(89.0, source.powerDraw, 0.01)
@@ -102,9 +102,9 @@ internal class SimPduTest {
@Test
fun testOutletClose() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
val outlet = pdu.newOutlet()
outlet.close()
@@ -115,6 +115,6 @@ internal class SimPduTest {
}
class SimpleInlet : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5)
+ override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5)
}
}
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index f3829ba1..b411e292 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -31,10 +31,10 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceEvent
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimPowerSource]
@@ -42,8 +42,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimPowerSourceTest {
@Test
fun testInitialState() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
assertFalse(source.isConnected)
assertNull(source.inlet)
@@ -52,8 +52,8 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectIdempotent() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
assertDoesNotThrow { source.disconnect() }
assertFalse(source.isConnected)
@@ -61,8 +61,8 @@ internal class SimPowerSourceTest {
@Test
fun testConnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = SimpleInlet()
source.connect(inlet)
@@ -76,27 +76,27 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0))
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0))
val inlet = object : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = consumer
+ override fun createConsumer(): FlowSource = consumer
}
source.connect(inlet)
source.disconnect()
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testDisconnectAssertion() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = mockk<SimPowerInlet>(relaxUnitFun = true)
every { inlet.isConnected } returns false
every { inlet._outlet } returns null
- every { inlet.createConsumer() } returns SimWorkConsumer(100.0, utilization = 1.0)
+ every { inlet.createConsumer() } returns FixedFlowSource(100.0, utilization = 1.0)
source.connect(inlet)
@@ -107,8 +107,8 @@ internal class SimPowerSourceTest {
@Test
fun testOutletAlreadyConnected() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = SimpleInlet()
source.connect(inlet)
@@ -121,8 +121,8 @@ internal class SimPowerSourceTest {
@Test
fun testInletAlreadyConnected() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = mockk<SimPowerInlet>(relaxUnitFun = true)
every { inlet.isConnected } returns true
@@ -132,6 +132,6 @@ internal class SimPowerSourceTest {
}
class SimpleInlet : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 1.0)
+ override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 1.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index 8d5fa857..31ac0b39 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceEvent
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimUps] class.
@@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimUpsTest {
@Test
fun testSingleInlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val ups = SimUps(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val ups = SimUps(engine)
source.connect(ups.newInlet())
ups.connect(SimpleInlet())
@@ -50,10 +50,10 @@ internal class SimUpsTest {
@Test
fun testDoubleInlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source1 = SimPowerSource(interpreter, capacity = 100.0)
- val source2 = SimPowerSource(interpreter, capacity = 100.0)
- val ups = SimUps(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source1 = SimPowerSource(engine, capacity = 100.0)
+ val source2 = SimPowerSource(engine, capacity = 100.0)
+ val ups = SimUps(engine)
source1.connect(ups.newInlet())
source2.connect(ups.newInlet())
@@ -67,10 +67,10 @@ internal class SimUpsTest {
@Test
fun testLoss() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
- val ups = SimUps(interpreter, idlePower = 4.0, lossCoefficient = 0.05)
+ val ups = SimUps(engine, idlePower = 4.0, lossCoefficient = 0.05)
source.connect(ups.newInlet())
ups.connect(SimpleInlet())
@@ -79,24 +79,24 @@ internal class SimUpsTest {
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source1 = SimPowerSource(interpreter, capacity = 100.0)
- val source2 = SimPowerSource(interpreter, capacity = 100.0)
- val ups = SimUps(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source1 = SimPowerSource(engine, capacity = 100.0)
+ val source2 = SimPowerSource(engine, capacity = 100.0)
+ val ups = SimUps(engine)
source1.connect(ups.newInlet())
source2.connect(ups.newInlet())
- val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0))
+ val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0))
val inlet = object : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = consumer
+ override fun createConsumer(): FlowSource = consumer
}
ups.connect(inlet)
ups.disconnect()
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
class SimpleInlet : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5)
+ override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
deleted file mode 100644
index b406b896..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * A controllable [SimResourceContext].
- *
- * This interface is used by resource providers to control the resource context.
- */
-public interface SimResourceControllableContext : SimResourceContext {
- /**
- * The capacity of the resource.
- */
- public override var capacity: Double
-
- /**
- * Start the resource context.
- */
- public fun start()
-
- /**
- * Invalidate the resource context's state.
- *
- * By invalidating the resource context's current state, the state is re-computed and the current progress is
- * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the
- * synchronous variant.
- */
- public fun invalidate()
-
- /**
- * Synchronously flush the progress of the resource context and materialize its current progress.
- */
- public fun flush()
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
deleted file mode 100644
index f1e004d2..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.simulator.resources.interference.InterferenceKey
-import java.util.ArrayDeque
-
-/**
- * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
- * a single output is directly connected to an input and that the switch can only support as many outputs as inputs.
- */
-public class SimResourceSwitchExclusive : SimResourceSwitch {
- override val outputs: Set<SimResourceProvider>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
-
- private val _inputs = mutableSetOf<SimResourceProvider>()
- override val inputs: Set<SimResourceProvider>
- get() = _inputs
- private val _availableInputs = ArrayDeque<SimResourceForwarder>()
-
- override val counters: SimResourceCounters = object : SimResourceCounters {
- override val demand: Double
- get() = _inputs.sumOf { it.counters.demand }
- override val actual: Double
- get() = _inputs.sumOf { it.counters.actual }
- override val overcommit: Double
- get() = _inputs.sumOf { it.counters.overcommit }
- override val interference: Double
- get() = _inputs.sumOf { it.counters.interference }
-
- override fun reset() {
- for (input in _inputs) {
- input.counters.reset()
- }
- }
-
- override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
- }
-
- /**
- * Add an output to the switch.
- */
- override fun newOutput(key: InterferenceKey?): SimResourceProvider {
- val forwarder = checkNotNull(_availableInputs.poll()) { "No capacity to serve request" }
- val output = Output(forwarder)
- _outputs += output
- return output
- }
-
- override fun removeOutput(output: SimResourceProvider) {
- if (!_outputs.remove(output)) {
- return
- }
-
- (output as Output).close()
- }
-
- /**
- * Add an input to the switch.
- */
- override fun addInput(input: SimResourceProvider) {
- if (input in inputs) {
- return
- }
-
- val forwarder = SimResourceForwarder()
-
- _inputs += input
- _availableInputs += forwarder
-
- input.startConsumer(object : SimResourceConsumer by forwarder {
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- if (event == SimResourceEvent.Exit) {
- // De-register the input after it has finished
- _inputs -= input
- }
-
- forwarder.onEvent(ctx, event)
- }
- })
- }
-
- override fun clear() {
- for (input in _inputs) {
- input.cancel()
- }
- _inputs.clear()
-
- // Outputs are implicitly cancelled by the inputs forwarders
- _outputs.clear()
- }
-
- /**
- * An output of the resource switch.
- */
- private inner class Output(private val forwarder: SimResourceForwarder) : SimResourceProvider by forwarder {
- /**
- * Close the output.
- */
- fun close() {
- // We explicitly do not close the forwarder here in order to re-use it across output resources.
- _outputs -= this
- _availableInputs += forwarder
- }
-
- override fun toString(): String = "SimResourceSwitchExclusive.Output"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
deleted file mode 100644
index 574fb443..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.simulator.resources.impl.SimResourceCountersImpl
-import org.opendc.simulator.resources.interference.InterferenceDomain
-import org.opendc.simulator.resources.interference.InterferenceKey
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
- * fair sharing.
- *
- * @param interpreter The interpreter for managing the resource contexts.
- * @param parent The parent resource system of the switch.
- * @param interferenceDomain The interference domain of the switch.
- */
-public class SimResourceSwitchMaxMin(
- private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
- private val interferenceDomain: InterferenceDomain? = null
-) : SimResourceSwitch {
- /**
- * The output resource providers to which resource consumers can be attached.
- */
- override val outputs: Set<SimResourceProvider>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
- private val _activeOutputs: MutableList<Output> = mutableListOf()
-
- /**
- * The input resources that will be switched between the output providers.
- */
- override val inputs: Set<SimResourceProvider>
- get() = _inputs
- private val _inputs = mutableSetOf<SimResourceProvider>()
- private val _activeInputs = mutableListOf<Input>()
-
- /**
- * The resource counters of this switch.
- */
- public override val counters: SimResourceCounters
- get() = _counters
- private val _counters = SimResourceCountersImpl()
-
- /**
- * The actual processing rate of the switch.
- */
- private var _rate = 0.0
-
- /**
- * The demanded processing rate of the outputs.
- */
- private var _demand = 0.0
-
- /**
- * The capacity of the switch.
- */
- private var _capacity = 0.0
-
- /**
- * Flag to indicate that the scheduler is active.
- */
- private var _schedulerActive = false
-
- /**
- * Add an output to the switch.
- */
- override fun newOutput(key: InterferenceKey?): SimResourceProvider {
- val provider = Output(_capacity, key)
- _outputs.add(provider)
- return provider
- }
-
- /**
- * Add the specified [input] to the switch.
- */
- override fun addInput(input: SimResourceProvider) {
- val consumer = Input(input)
- if (_inputs.add(input)) {
- _activeInputs.add(consumer)
- input.startConsumer(consumer)
- }
- }
-
- /**
- * Remove [output] from this switch.
- */
- override fun removeOutput(output: SimResourceProvider) {
- if (!_outputs.remove(output)) {
- return
- }
- // This cast should always succeed since only `Output` instances should be added to _outputs
- (output as Output).close()
- }
-
- override fun clear() {
- for (input in _activeInputs) {
- input.cancel()
- }
- _activeInputs.clear()
-
- for (output in _activeOutputs) {
- output.cancel()
- }
- _activeOutputs.clear()
- }
-
- /**
- * Run the scheduler of the switch.
- */
- private fun runScheduler(now: Long) {
- if (_schedulerActive) {
- return
- }
-
- _schedulerActive = true
- try {
- doSchedule(now)
- } finally {
- _schedulerActive = false
- }
- }
-
- /**
- * Schedule the outputs over the input.
- */
- private fun doSchedule(now: Long) {
- // If there is no work yet, mark the input as idle.
- if (_activeOutputs.isEmpty()) {
- return
- }
-
- val capacity = _capacity
- var availableCapacity = capacity
-
- // Pull in the work of the outputs
- val outputIterator = _activeOutputs.listIterator()
- for (output in outputIterator) {
- output.pull(now)
-
- // Remove outputs that have finished
- if (!output.isActive) {
- outputIterator.remove()
- }
- }
-
- var demand = 0.0
-
- // Sort in-place the outputs based on their requested usage.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- _activeOutputs.sort()
-
- // Divide the available input capacity fairly across the outputs using max-min fair sharing
- var remaining = _activeOutputs.size
- for (output in _activeOutputs) {
- val availableShare = availableCapacity / remaining--
- val grantedSpeed = min(output.allowedRate, availableShare)
-
- // Ignore idle computation
- if (grantedSpeed <= 0.0) {
- output.actualRate = 0.0
- continue
- }
-
- demand += output.limit
-
- output.actualRate = grantedSpeed
- availableCapacity -= grantedSpeed
- }
-
- val rate = capacity - availableCapacity
-
- _demand = demand
- _rate = rate
-
- // Sort all consumers by their capacity
- _activeInputs.sort()
-
- // Divide the requests over the available capacity of the input resources fairly
- for (input in _activeInputs) {
- val inputCapacity = input.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
-
- input.push(grantedSpeed)
- }
- }
-
- /**
- * Recompute the capacity of the switch.
- */
- private fun updateCapacity() {
- val newCapacity = _activeInputs.sumOf(Input::capacity)
-
- // No-op if the capacity is unchanged
- if (_capacity == newCapacity) {
- return
- }
-
- _capacity = newCapacity
-
- for (output in _outputs) {
- output.capacity = newCapacity
- }
- }
-
- /**
- * An internal [SimResourceProvider] implementation for switch outputs.
- */
- private inner class Output(capacity: Double, val key: InterferenceKey?) :
- SimAbstractResourceProvider(interpreter, capacity),
- SimResourceProviderLogic,
- Comparable<Output> {
- /**
- * The requested limit.
- */
- @JvmField var limit: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- @JvmField var actualRate: Double = 0.0
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- val allowedRate: Double
- get() = min(capacity, limit)
-
- /**
- * A flag to indicate that the output is closed.
- */
- private var _isClosed: Boolean = false
-
- /**
- * The timestamp at which we received the last command.
- */
- private var _lastPull: Long = Long.MIN_VALUE
-
- /**
- * Close the output.
- *
- * This method is invoked when the user removes an output from the switch.
- */
- fun close() {
- _isClosed = true
- cancel()
- }
-
- /* SimAbstractResourceProvider */
- override fun createLogic(): SimResourceProviderLogic = this
-
- override fun start(ctx: SimResourceControllableContext) {
- check(!_isClosed) { "Cannot re-use closed output" }
-
- _activeOutputs += this
- super.start(ctx)
- }
-
- /* SimResourceProviderLogic */
- override fun onConsume(
- ctx: SimResourceControllableContext,
- now: Long,
- delta: Long,
- limit: Double,
- duration: Long
- ) {
- doUpdateCounters(delta)
-
- actualRate = 0.0
- this.limit = limit
- _lastPull = now
-
- runScheduler(now)
- }
-
- override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- parent?.onConverge(now)
- }
-
- override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- doUpdateCounters(delta)
-
- limit = 0.0
- actualRate = 0.0
- _lastPull = now
- }
-
- /* Comparable */
- override fun compareTo(other: Output): Int = allowedRate.compareTo(other.allowedRate)
-
- /**
- * Pull the next command if necessary.
- */
- fun pull(now: Long) {
- val ctx = ctx
- if (ctx != null && _lastPull < now) {
- ctx.flush()
- }
- }
-
- /**
- * Helper method to update the resource counters of the distributor.
- */
- private fun doUpdateCounters(delta: Long) {
- if (delta <= 0L) {
- return
- }
-
- // Compute the performance penalty due to resource interference
- val perfScore = if (interferenceDomain != null) {
- val load = _rate / capacity
- interferenceDomain.apply(key, load)
- } else {
- 1.0
- }
-
- val deltaS = delta / 1000.0
- val work = limit * deltaS
- val actualWork = actualRate * deltaS
- val remainingWork = work - actualWork
-
- updateCounters(work, actualWork, remainingWork)
-
- val distCounters = _counters
- distCounters.demand += work
- distCounters.actual += actualWork
- distCounters.overcommit += remainingWork
- distCounters.interference += actualWork * max(0.0, 1 - perfScore)
- }
- }
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private inner class Input(private val provider: SimResourceProvider) : SimResourceConsumer, Comparable<Input> {
- /**
- * The active [SimResourceContext] of this consumer.
- */
- private var _ctx: SimResourceContext? = null
-
- /**
- * The capacity of this input.
- */
- val capacity: Double
- get() = _ctx?.capacity ?: 0.0
-
- /**
- * Push the specified rate to the provider.
- */
- fun push(rate: Double) {
- _ctx?.push(rate)
- }
-
- /**
- * Cancel this input.
- */
- fun cancel() {
- provider.cancel()
- }
-
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- runScheduler(now)
- return Long.MAX_VALUE
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- assert(_ctx == null) { "Consumer running concurrently" }
- _ctx = ctx
- updateCapacity()
- }
- SimResourceEvent.Exit -> {
- _ctx = null
- updateCapacity()
- }
- SimResourceEvent.Capacity -> updateCapacity()
- else -> {}
- }
- }
-
- override fun compareTo(other: Input): Int = capacity.compareTo(other.capacity)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
deleted file mode 100644
index 1428ce42..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import io.mockk.*
-import kotlinx.coroutines.*
-import org.junit.jupiter.api.*
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceContextImpl
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
-
-/**
- * A test suite for the [SimResourceContextImpl] class.
- */
-class SimResourceContextTest {
- @Test
- fun testFlushWithoutCommand() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(1.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(interpreter, consumer, logic)
-
- interpreter.scheduleSync(interpreter.clock.millis(), context)
- }
-
- @Test
- fun testIntermediateFlush() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = SimWorkConsumer(1.0, 1.0)
-
- val logic = spyk(object : SimResourceProviderLogic {})
- val context = SimResourceContextImpl(interpreter, consumer, logic)
- context.capacity = 1.0
-
- context.start()
- delay(1) // Delay 1 ms to prevent hitting the fast path
- interpreter.scheduleSync(interpreter.clock.millis(), context)
-
- verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) }
- }
-
- @Test
- fun testIntermediateFlushIdle() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = SimWorkConsumer(1.0, 1.0)
-
- val logic = spyk(object : SimResourceProviderLogic {})
- val context = SimResourceContextImpl(interpreter, consumer, logic)
- context.capacity = 1.0
-
- context.start()
- delay(500)
- context.invalidate()
- delay(500)
- context.invalidate()
-
- assertAll(
- { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } },
- { verify(exactly = 1) { logic.onFinish(any(), any(), any()) } }
- )
- }
-
- @Test
- fun testDoubleStart() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(0.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(interpreter, consumer, logic)
-
- context.start()
-
- assertThrows<IllegalStateException> {
- context.start()
- }
- }
-
- @Test
- fun testIdempotentCapacityChange() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = spyk(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(1.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- })
-
- val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(interpreter, consumer, logic)
- context.capacity = 4200.0
- context.start()
- context.capacity = 4200.0
-
- verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
- }
-
- @Test
- fun testFailureNoInfiniteLoop() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val consumer = spyk(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
- return Long.MAX_VALUE
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- if (event == SimResourceEvent.Exit) throw IllegalStateException("onEvent")
- }
-
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
- throw IllegalStateException("onFailure")
- }
- })
-
- val logic = object : SimResourceProviderLogic {}
-
- val context = SimResourceContextImpl(interpreter, consumer, logic)
-
- context.start()
-
- delay(1)
-
- verify(exactly = 1) { consumer.onFailure(any(), any()) }
- }
-}