summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt10
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt2
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt10
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt2
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt42
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt2
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt20
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt49
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt18
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt24
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt30
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt26
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt48
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt36
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts (renamed from opendc-simulator/opendc-simulator-resources/build.gradle.kts)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt (renamed from opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt)78
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt)63
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt)30
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt)55
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt45
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt)24
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt)16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt)25
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt)16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt)97
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt)34
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt)31
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt)6
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt)8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt)79
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt)10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt)77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt)37
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt127
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt399
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt)32
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt)8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt)36
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt)29
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt152
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt)110
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt)112
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt)83
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt)72
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt (renamed from opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt)23
-rw-r--r--opendc-simulator/opendc-simulator-network/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt12
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt10
-rw-r--r--opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt21
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt50
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt28
-rw-r--r--opendc-simulator/opendc-simulator-power/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt36
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt6
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt12
-rw-r--r--opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt33
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt52
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt46
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt44
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt54
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt129
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt407
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt173
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt6
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt4
-rw-r--r--settings.gradle.kts2
94 files changed, 1759 insertions, 1813 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index a8b8afe9..f499927d 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -47,7 +47,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong
@@ -61,7 +61,7 @@ public class SimHost(
model: MachineModel,
override val meta: Map<String, Any>,
context: CoroutineContext,
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
meterProvider: MeterProvider,
hypervisor: SimHypervisorProvider,
scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(),
@@ -78,7 +78,7 @@ public class SimHost(
/**
* The clock instance used by the host.
*/
- private val clock = interpreter.clock
+ private val clock = engine.clock
/**
* The logger instance of this server.
@@ -98,13 +98,13 @@ public class SimHost(
/**
* The machine to run on.
*/
- public val machine: SimBareMetalMachine = SimBareMetalMachine(interpreter, model.optimize(), powerDriver)
+ public val machine: SimBareMetalMachine = SimBareMetalMachine(engine, model.optimize(), powerDriver)
/**
* The hypervisor to run multiple workloads.
*/
private val hypervisor: SimHypervisor = hypervisor.create(
- interpreter,
+ engine,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
listener = object : SimHypervisor.Listener {
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 7f33154a..eda76ba0 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -198,7 +198,7 @@ internal class Guest(
}
/**
- * Run the process that models the virtual machine lifecycle as a coroutine.
+ * Converge the process that models the virtual machine lifecycle as a coroutine.
*/
private suspend fun runMachine(workload: SimWorkload) {
delay(1) // TODO Introduce model for boot time
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
index 6919b7fd..7d46e626 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/HostFaultInjectorImpl.kt
@@ -75,7 +75,7 @@ internal class HostFaultInjectorImpl(
}
/**
- * Run the injection process.
+ * Converge the injection process.
*/
private suspend fun runInjector() {
while (true) {
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e75c31a0..d2293be7 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -41,7 +41,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.HOST_ID
import org.opendc.telemetry.compute.table.HostData
@@ -87,14 +87,14 @@ internal class SimHostTest {
.setClock(clock.toOtelClock())
.build()
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val virtDriver = SimHost(
uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
- interpreter,
+ engine,
meterProvider,
SimFairShareHypervisorProvider()
)
@@ -199,14 +199,14 @@ internal class SimHostTest {
.setClock(clock.toOtelClock())
.build()
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val host = SimHost(
uid = hostId,
name = "test",
model = machineModel,
meta = emptyMap(),
coroutineContext,
- interpreter,
+ engine,
meterProvider,
SimFairShareHypervisorProvider()
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
index ed45bd8a..283f82fe 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt
@@ -36,7 +36,7 @@ import org.opendc.compute.simulator.SimHost
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.compute.*
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
@@ -73,9 +73,9 @@ public class ComputeWorkloadRunner(
private val _metricProducers = mutableListOf<MetricProducer>()
/**
- * The [SimResourceInterpreter] to simulate the hosts.
+ * The [FlowEngine] to simulate the hosts.
*/
- private val interpreter = SimResourceInterpreter(context, clock)
+ private val engine = FlowEngine(context, clock)
/**
* The hosts that belong to this class.
@@ -89,7 +89,7 @@ public class ComputeWorkloadRunner(
}
/**
- * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace].
+ * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace].
*/
public suspend fun run(trace: List<VirtualMachine>, seed: Long) {
val random = Random(seed)
@@ -178,7 +178,7 @@ public class ComputeWorkloadRunner(
spec.model,
spec.meta,
context,
- interpreter,
+ engine,
meterProvider,
spec.hypervisor,
powerDriver = spec.powerDriver,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 21ff3ab0..4e855f82 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -131,7 +131,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
// Instantiate the desired topology
runner.apply(topology)
- // Run the workload trace
+ // Converge the workload trace
runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
runner.close()
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 1fbd3b6a..2ba65e90 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
import java.time.Clock
import java.util.*
import kotlin.coroutines.Continuation
@@ -65,7 +65,7 @@ public class SimTFDevice(
* The [SimMachine] representing the device.
*/
private val machine = SimBareMetalMachine(
- SimResourceInterpreter(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)),
+ FlowEngine(scope.coroutineContext, clock), MachineModel(listOf(pu), listOf(memory)),
SimplePowerDriver(powerModel)
)
@@ -95,11 +95,11 @@ public class SimTFDevice(
/**
* The workload that will be run by the device.
*/
- private val workload = object : SimWorkload, SimResourceConsumer {
+ private val workload = object : SimWorkload, FlowSource {
/**
* The resource context to interrupt the workload with.
*/
- var ctx: SimResourceContext? = null
+ var ctx: FlowConnection? = null
/**
* The capacity of the device.
@@ -128,16 +128,16 @@ public class SimTFDevice(
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val consumedWork = ctx.speed * delta / 1000.0
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val consumedWork = conn.rate * delta / 1000.0
val activeWork = activeWork
if (activeWork != null) {
if (activeWork.consume(consumedWork)) {
this.activeWork = null
} else {
- val duration = (activeWork.flops / ctx.capacity * 1000).roundToLong()
- ctx.push(ctx.capacity)
+ val duration = (activeWork.flops / conn.capacity * 1000).roundToLong()
+ conn.push(conn.capacity)
return duration
}
}
@@ -146,27 +146,27 @@ public class SimTFDevice(
val head = queue.poll()
return if (head != null) {
this.activeWork = head
- val duration = (head.flops / ctx.capacity * 1000).roundToLong()
- ctx.push(ctx.capacity)
+ val duration = (head.flops / conn.capacity * 1000).roundToLong()
+ conn.push(conn.capacity)
duration
} else {
- ctx.push(0.0)
+ conn.push(0.0)
Long.MAX_VALUE
}
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- this.capacity = ctx.capacity
+ FlowEvent.Start -> {
+ this.ctx = conn
+ this.capacity = conn.capacity
}
- SimResourceEvent.Capacity -> {
- this.capacity = ctx.capacity
- ctx.interrupt()
+ FlowEvent.Capacity -> {
+ this.capacity = conn.capacity
+ conn.pull()
}
- SimResourceEvent.Run -> {
- _usage.record(ctx.speed)
+ FlowEvent.Converge -> {
+ _usage.record(conn.rate)
_power.record(machine.psu.powerDraw)
}
else -> {}
@@ -188,7 +188,7 @@ public class SimTFDevice(
override suspend fun compute(flops: Double) = suspendCancellableCoroutine<Unit> { cont ->
workload.queue.add(Work(flops, cont))
if (workload.isIdle) {
- workload.ctx?.interrupt()
+ workload.ctx?.pull()
}
}
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
index 5839c0df..3e755b56 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/distribute/Strategy.kt
@@ -27,7 +27,7 @@ package org.opendc.experiments.tf20.distribute
*/
public interface Strategy {
/**
- * Run the specified batch using the given strategy.
+ * Converge the specified batch using the given strategy.
*/
public suspend fun run(forward: Double, backward: Double, batchSize: Int)
}
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 68bdc337..020d75b5 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -36,7 +36,7 @@ import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import java.time.Clock
import java.util.ArrayDeque
import kotlin.coroutines.Continuation
@@ -74,7 +74,7 @@ public class SimFunctionDeployer(
* The machine that will execute the workloads.
*/
public val machine: SimMachine = SimBareMetalMachine(
- SimResourceInterpreter(scope.coroutineContext, clock),
+ FlowEngine(scope.coroutineContext, clock),
model,
SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index 7d06ee62..e2290a14 100644
--- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -31,7 +31,7 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
- api(projects.opendcSimulator.opendcSimulatorResources)
+ api(projects.opendcSimulator.opendcSimulatorFlow)
api(projects.opendcSimulator.opendcSimulatorPower)
api(projects.opendcSimulator.opendcSimulatorNetwork)
implementation(projects.opendcSimulator.opendcSimulatorCore)
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 88ad7286..c57919c1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -36,7 +36,7 @@ import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
@@ -48,13 +48,13 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var interpreter: SimResourceInterpreter
+ private lateinit var engine: FlowEngine
private lateinit var machineModel: MachineModel
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
+ engine = FlowEngine(scope.coroutineContext, scope.clock)
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
@@ -80,7 +80,7 @@ class SimMachineBenchmarks {
fun benchmarkBareMetal(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace))
}
@@ -90,9 +90,9 @@ class SimMachineBenchmarks {
fun benchmarkSpaceSharedHypervisor(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
@@ -111,9 +111,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorSingle(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(interpreter)
+ val hypervisor = SimFairShareHypervisor(engine)
launch { machine.run(hypervisor) }
@@ -132,9 +132,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorDouble(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(interpreter)
+ val hypervisor = SimFairShareHypervisor(engine)
launch { machine.run(hypervisor) }
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index f9db048d..6a62d8a5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -30,22 +30,22 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.NetworkAdapter
import org.opendc.simulator.compute.model.StorageDevice
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
*
- * @param interpreter The interpreter to manage the machine's resources.
+ * @param engine The engine to manage the machine's resources.
* @param parent The parent simulation system.
* @param model The model of the machine.
*/
public abstract class SimAbstractMachine(
- protected val interpreter: SimResourceInterpreter,
- final override val parent: SimResourceSystem?,
+ protected val engine: FlowEngine,
+ final override val parent: FlowSystem?,
final override val model: MachineModel
-) : SimMachine, SimResourceSystem {
+) : SimMachine, FlowSystem {
/**
* The resources allocated for this machine.
*/
@@ -54,17 +54,17 @@ public abstract class SimAbstractMachine(
/**
* The memory interface of the machine.
*/
- public val memory: SimMemory = Memory(SimResourceSource(model.memory.sumOf { it.size }.toDouble(), interpreter), model.memory)
+ public val memory: SimMemory = Memory(FlowSink(engine, model.memory.sumOf { it.size }.toDouble()), model.memory)
/**
* The network interfaces available to the machine.
*/
- public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(adapter, i) }
+ public val net: List<SimNetworkInterface> = model.net.mapIndexed { i, adapter -> NetworkAdapterImpl(engine, adapter, i) }
/**
* The network interfaces available to the machine.
*/
- public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(interpreter, device, i) }
+ public val storage: List<SimStorageInterface> = model.storage.mapIndexed { i, device -> StorageDeviceImpl(engine, device, i) }
/**
* The peripherals of the machine.
@@ -82,7 +82,7 @@ public abstract class SimAbstractMachine(
private var cont: Continuation<Unit>? = null
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
check(!isTerminated) { "Machine is terminated" }
@@ -96,14 +96,14 @@ public abstract class SimAbstractMachine(
// Cancel all cpus on cancellation
cont.invokeOnCancellation {
this.cont = null
- interpreter.batch {
+ engine.batch {
for (cpu in cpus) {
cpu.cancel()
}
}
}
- interpreter.batch { workload.onStart(ctx) }
+ engine.batch { workload.onStart(ctx) }
}
}
@@ -120,7 +120,7 @@ public abstract class SimAbstractMachine(
* Cancel the workload that is currently running on the machine.
*/
private fun cancel() {
- interpreter.batch {
+ engine.batch {
for (cpu in cpus) {
cpu.cancel()
}
@@ -137,8 +137,8 @@ public abstract class SimAbstractMachine(
* The execution context in which the workload runs.
*/
private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val interpreter: SimResourceInterpreter
- get() = this@SimAbstractMachine.interpreter
+ override val engine: FlowEngine
+ get() = this@SimAbstractMachine.engine
override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
@@ -154,7 +154,7 @@ public abstract class SimAbstractMachine(
/**
* The [SimMemory] implementation for a machine.
*/
- private class Memory(source: SimResourceSource, override val models: List<MemoryUnit>) : SimMemory, SimResourceProvider by source {
+ private class Memory(source: FlowSink, override val models: List<MemoryUnit>) : SimMemory, FlowConsumer by source {
override fun toString(): String = "SimAbstractMachine.Memory"
}
@@ -162,6 +162,7 @@ public abstract class SimAbstractMachine(
* The [SimNetworkAdapter] implementation for a machine.
*/
private class NetworkAdapterImpl(
+ private val engine: FlowEngine,
model: NetworkAdapter,
index: Int
) : SimNetworkAdapter(), SimNetworkInterface {
@@ -169,18 +170,18 @@ public abstract class SimAbstractMachine(
override val bandwidth: Double = model.bandwidth
- override val provider: SimResourceProvider
+ override val provider: FlowConsumer
get() = _rx
- override fun createConsumer(): SimResourceConsumer = _tx
+ override fun createConsumer(): FlowSource = _tx
- override val tx: SimResourceProvider
+ override val tx: FlowConsumer
get() = _tx
- private val _tx = SimResourceForwarder()
+ private val _tx = FlowForwarder(engine)
- override val rx: SimResourceConsumer
+ override val rx: FlowSource
get() = _rx
- private val _rx = SimResourceForwarder()
+ private val _rx = FlowForwarder(engine)
override fun toString(): String = "SimAbstractMachine.NetworkAdapterImpl[name=$name,bandwidth=$bandwidth]"
}
@@ -189,7 +190,7 @@ public abstract class SimAbstractMachine(
* The [SimStorageInterface] implementation for a machine.
*/
private class StorageDeviceImpl(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
model: StorageDevice,
index: Int
) : SimStorageInterface {
@@ -197,9 +198,9 @@ public abstract class SimAbstractMachine(
override val capacity: Double = model.capacity
- override val read: SimResourceProvider = SimResourceSource(model.readBandwidth, interpreter)
+ override val read: FlowConsumer = FlowSink(engine, model.readBandwidth)
- override val write: SimResourceProvider = SimResourceSource(model.writeBandwidth, interpreter)
+ override val write: FlowConsumer = FlowSink(engine, model.writeBandwidth)
override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 639ca450..37cf282b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -26,8 +26,8 @@ import org.opendc.simulator.compute.device.SimPsu
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.PowerDriver
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.FlowEngine
/**
* A simulated bare-metal machine that is able to run a single workload.
@@ -35,19 +35,19 @@ import org.opendc.simulator.resources.SimResourceInterpreter
* A [SimBareMetalMachine] is a stateful object, and you should be careful when operating this object concurrently. For
* example, the class expects only a single concurrent call to [run].
*
- * @param interpreter The [SimResourceInterpreter] to drive the simulation.
+ * @param engine The [FlowEngine] to drive the simulation.
* @param model The machine model to simulate.
* @param powerDriver The power driver to use.
* @param psu The power supply of the machine.
* @param parent The parent simulation system.
*/
public class SimBareMetalMachine(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
model: MachineModel,
powerDriver: PowerDriver,
public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)),
- parent: SimResourceSystem? = null,
-) : SimAbstractMachine(interpreter, parent, model) {
+ parent: FlowSystem? = null,
+) : SimAbstractMachine(engine, parent, model) {
/**
* The power draw of the machine onto the PSU.
*/
@@ -58,7 +58,7 @@ public class SimBareMetalMachine(
* The processing units of the machine.
*/
override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu ->
- Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu)
+ Cpu(FlowSink(engine, cpu.frequency, this@SimBareMetalMachine), cpu)
}
/**
@@ -78,9 +78,9 @@ public class SimBareMetalMachine(
* A [SimProcessingUnit] of a bare-metal machine.
*/
private class Cpu(
- private val source: SimResourceSource,
+ private val source: FlowSink,
override val model: ProcessingUnit
- ) : SimProcessingUnit, SimResourceProvider by source {
+ ) : SimProcessingUnit, FlowConsumer by source {
override var capacity: Double
get() = source.capacity
set(value) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
index d8dd8205..ab0b56ae 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt
@@ -41,7 +41,7 @@ public interface SimMachine : AutoCloseable {
public val peripherals: List<SimPeripheral>
/**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
public suspend fun run(workload: SimWorkload, meta: Map<String, Any> = emptyMap())
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index 6996a30d..1317f728 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* A simulated execution context in which a bootable image runs. This interface represents the
@@ -31,9 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter
*/
public interface SimMachineContext : AutoCloseable {
/**
- * The resource interpreter that simulates the machine.
+ * The [FlowEngine] that simulates the machine.
*/
- public val interpreter: SimResourceInterpreter
+ public val engine: FlowEngine
/**
* The metadata associated with the context.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt
index 6623df23..b1aef495 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMemory.kt
@@ -23,12 +23,12 @@
package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.MemoryUnit
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
/**
* An interface to control the memory usage of simulated workloads.
*/
-public interface SimMemory : SimResourceProvider {
+public interface SimMemory : FlowConsumer {
/**
* The models representing the static information of the memory units supporting this interface.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt
index 1ac126ae..660b2871 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimNetworkInterface.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowSource
/**
* A firmware interface to a network adapter.
@@ -42,10 +42,10 @@ public interface SimNetworkInterface {
/**
* The resource provider for the transmit channel of the network interface.
*/
- public val tx: SimResourceProvider
+ public val tx: FlowConsumer
/**
* The resource consumer for the receive channel of the network interface.
*/
- public val rx: SimResourceConsumer
+ public val rx: FlowSource
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
index 93c9ddfa..c9f36ece 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
@@ -23,12 +23,12 @@
package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
/**
* A simulated processing unit.
*/
-public interface SimProcessingUnit : SimResourceProvider {
+public interface SimProcessingUnit : FlowConsumer {
/**
* The capacity of the processing unit, which can be adjusted by the workload if supported by the machine.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt
index 21a801f1..3d648671 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimStorageInterface.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
/**
* A firmware interface to a storage device.
@@ -41,10 +41,10 @@ public interface SimStorageInterface {
/**
* The resource provider for the read operations of the storage device.
*/
- public val read: SimResourceProvider
+ public val read: FlowConsumer
/**
* The resource consumer for the write operation of the storage device.
*/
- public val write: SimResourceProvider
+ public val write: FlowConsumer
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index 6e6e590f..b05d8ad9 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -23,10 +23,10 @@
package org.opendc.simulator.compute.device
import org.opendc.simulator.compute.power.PowerDriver
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
import org.opendc.simulator.power.SimPowerInlet
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
import java.util.*
/**
@@ -54,7 +54,7 @@ public class SimPsu(
/**
* The consumer context.
*/
- private var _ctx: SimResourceContext? = null
+ private var _ctx: FlowConnection? = null
/**
* The driver that is connected to the PSU.
@@ -69,7 +69,7 @@ public class SimPsu(
* Update the power draw of the PSU.
*/
public fun update() {
- _ctx?.interrupt()
+ _ctx?.pull()
}
/**
@@ -81,18 +81,18 @@ public class SimPsu(
update()
}
- override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun createConsumer(): FlowSource = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0)
- ctx.push(powerDraw)
+ conn.push(powerDraw)
return Long.MAX_VALUE
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> _ctx = ctx
- SimResourceEvent.Run -> _powerDraw = ctx.speed
- SimResourceEvent.Exit -> _ctx = null
+ FlowEvent.Start -> _ctx = conn
+ FlowEvent.Converge -> _powerDraw = conn.rate
+ FlowEvent.Exit -> _ctx = null
else -> {}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index cf9e3230..b145eefc 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -28,17 +28,17 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.SimResourceSwitch
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.FlowMultiplexer
/**
* Abstract implementation of the [SimHypervisor] interface.
*
- * @param interpreter The resource interpreter to use.
+ * @param engine The [FlowEngine] to drive the simulation.
* @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware.
*/
public abstract class SimAbstractHypervisor(
- private val interpreter: SimResourceInterpreter,
+ protected val engine: FlowEngine,
private val scalingGovernor: ScalingGovernor? = null,
protected val interferenceDomain: VmInterferenceDomain? = null
) : SimHypervisor {
@@ -50,7 +50,7 @@ public abstract class SimAbstractHypervisor(
/**
* The resource switch to use.
*/
- private lateinit var switch: SimResourceSwitch
+ private lateinit var mux: FlowMultiplexer
/**
* The virtual machines running on this hypervisor.
@@ -62,8 +62,8 @@ public abstract class SimAbstractHypervisor(
/**
* The resource counters associated with the hypervisor.
*/
- public override val counters: SimResourceCounters
- get() = switch.counters
+ public override val counters: FlowCounters
+ get() = mux.counters
/**
* The scaling governors attached to the physical CPUs backing this hypervisor.
@@ -71,14 +71,14 @@ public abstract class SimAbstractHypervisor(
private val governors = mutableListOf<ScalingGovernor.Logic>()
/**
- * Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs.
+ * Construct the [FlowMultiplexer] implementation that performs the actual scheduling of the CPUs.
*/
- public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch
+ public abstract fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer
/**
* Check whether the specified machine model fits on this hypervisor.
*/
- public abstract fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean
+ public abstract fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean
/**
* Trigger the governors to recompute the scaling limits.
@@ -91,7 +91,7 @@ public abstract class SimAbstractHypervisor(
/* SimHypervisor */
override fun canFit(model: MachineModel): Boolean {
- return canFit(model, switch)
+ return canFit(model, mux)
}
override fun createMachine(model: MachineModel, interferenceId: String?): SimMachine {
@@ -104,7 +104,7 @@ public abstract class SimAbstractHypervisor(
/* SimWorkload */
override fun onStart(ctx: SimMachineContext) {
context = ctx
- switch = createSwitch(ctx)
+ mux = createMultiplexer(ctx)
for (cpu in ctx.cpus) {
val governor = scalingGovernor?.createLogic(ScalingPolicyImpl(cpu))
@@ -113,7 +113,7 @@ public abstract class SimAbstractHypervisor(
governor.onStart()
}
- switch.addInput(cpu)
+ mux.addOutput(cpu)
}
}
@@ -122,7 +122,7 @@ public abstract class SimAbstractHypervisor(
*
* @param model The machine model of the virtual machine.
*/
- private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(interpreter, parent = null, model) {
+ private inner class VirtualMachine(model: MachineModel, interferenceId: String? = null) : SimAbstractMachine(engine, parent = null, model) {
/**
* The interference key of this virtual machine.
*/
@@ -131,7 +131,7 @@ public abstract class SimAbstractHypervisor(
/**
* The vCPUs of the machine.
*/
- override val cpus = model.cpus.map { VCpu(switch, switch.newOutput(interferenceKey), it) }
+ override val cpus = model.cpus.map { VCpu(mux, mux.newInput(interferenceKey), it) }
override fun close() {
super.close()
@@ -153,10 +153,10 @@ public abstract class SimAbstractHypervisor(
* A [SimProcessingUnit] of a virtual machine.
*/
private class VCpu(
- private val switch: SimResourceSwitch,
- private val source: SimResourceProvider,
+ private val switch: FlowMultiplexer,
+ private val source: FlowConsumer,
override val model: ProcessingUnit
- ) : SimProcessingUnit, SimResourceProvider by source {
+ ) : SimProcessingUnit, FlowConsumer by source {
override var capacity: Double
get() = source.capacity
set(_) {
@@ -169,7 +169,7 @@ public abstract class SimAbstractHypervisor(
* Close the CPU
*/
fun close() {
- switch.removeOutput(source)
+ switch.removeInput(source)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
index 3b44292d..36ab7c1c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt
@@ -28,39 +28,39 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSwitch
-import org.opendc.simulator.resources.SimResourceSwitchMaxMin
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
+import org.opendc.simulator.flow.mux.FlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload]s on a single [SimMachine]
* concurrently using weighted fair sharing.
*
- * @param interpreter The interpreter to manage the machine's resources.
+ * @param engine The [FlowEngine] to manage the machine's resources.
* @param parent The parent simulation system.
* @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
* @param interferenceDomain The resource interference domain to which the hypervisor belongs.
* @param listener The hypervisor listener to use.
*/
public class SimFairShareHypervisor(
- private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
+ engine: FlowEngine,
+ private val parent: FlowSystem? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
private val listener: SimHypervisor.Listener? = null
-) : SimAbstractHypervisor(interpreter, scalingGovernor, interferenceDomain) {
+) : SimAbstractHypervisor(engine, scalingGovernor, interferenceDomain) {
- override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean = true
+ override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean = true
- override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
+ override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
return SwitchSystem(ctx).switch
}
- private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem {
- val switch = SimResourceSwitchMaxMin(interpreter, this, interferenceDomain)
+ private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowSystem {
+ val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain)
- override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent
+ override val parent: FlowSystem? = this@SimFairShareHypervisor.parent
private var lastCpuUsage = 0.0
private var lastCpuDemand = 0.0
@@ -87,8 +87,8 @@ public class SimFairShareHypervisor(
}
lastReport = timestamp
- lastCpuDemand = switch.inputs.sumOf { it.demand }
- lastCpuUsage = switch.inputs.sumOf { it.speed }
+ lastCpuDemand = switch.outputs.sumOf { it.demand }
+ lastCpuUsage = switch.outputs.sumOf { it.rate }
lastDemand = counters.demand
lastActual = counters.actual
lastOvercommit = counters.overcommit
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
index 8d0592ec..bfa099fb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
@@ -34,13 +34,13 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
override fun create(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem?,
+ engine: FlowEngine,
+ parent: FlowSystem?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
): SimHypervisor = SimFairShareHypervisor(
- interpreter,
+ engine,
parent,
scalingGovernor = scalingGovernor,
interferenceDomain = interferenceDomain,
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
index 3b49d515..1b11ca6b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt
@@ -25,7 +25,7 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.SimResourceCounters
+import org.opendc.simulator.flow.FlowCounters
/**
* A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload
@@ -40,7 +40,7 @@ public interface SimHypervisor : SimWorkload {
/**
* The resource counters associated with the hypervisor.
*/
- public val counters: SimResourceCounters
+ public val counters: FlowCounters
/**
* Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
index b307a34d..97f07097 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
/**
* A service provider interface for constructing a [SimHypervisor].
@@ -43,8 +43,8 @@ public interface SimHypervisorProvider {
* Create a [SimHypervisor] instance with the specified [listener].
*/
public fun create(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem? = null,
+ engine: FlowEngine,
+ parent: FlowSystem? = null,
scalingGovernor: ScalingGovernor? = null,
interferenceDomain: VmInterferenceDomain? = null,
listener: SimHypervisor.Listener? = null
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
index ac1c0250..883e0d82 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt
@@ -24,19 +24,19 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.MachineModel
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSwitch
-import org.opendc.simulator.resources.SimResourceSwitchExclusive
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.mux.FlowMultiplexer
+import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
-public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter) {
- override fun canFit(model: MachineModel, switch: SimResourceSwitch): Boolean {
- return switch.inputs.size - switch.outputs.size >= model.cpus.size
+public class SimSpaceSharedHypervisor(engine: FlowEngine) : SimAbstractHypervisor(engine) {
+ override fun canFit(model: MachineModel, switch: FlowMultiplexer): Boolean {
+ return switch.outputs.size - switch.inputs.size >= model.cpus.size
}
- override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
- return SimResourceSwitchExclusive()
+ override fun createMultiplexer(ctx: SimMachineContext): FlowMultiplexer {
+ return ForwardingFlowMultiplexer(engine)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
index 3906cb9a..7869d72d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel
import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSystem
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSystem
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
@@ -34,10 +34,10 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
override fun create(
- interpreter: SimResourceInterpreter,
- parent: SimResourceSystem?,
+ engine: FlowEngine,
+ parent: FlowSystem?,
scalingGovernor: ScalingGovernor?,
interferenceDomain: VmInterferenceDomain?,
listener: SimHypervisor.Listener?
- ): SimHypervisor = SimSpaceSharedHypervisor(interpreter)
+ ): SimHypervisor = SimSpaceSharedHypervisor(engine)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
index 1801fcd0..b737d61a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute.kernel.interference
-import org.opendc.simulator.resources.interference.InterferenceDomain
-import org.opendc.simulator.resources.interference.InterferenceKey
+import org.opendc.simulator.flow.interference.InterferenceDomain
+import org.opendc.simulator.flow.interference.InterferenceKey
/**
* The interference domain of a hypervisor.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
index c2e00c8e..b3d72507 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.compute.kernel.interference
-import org.opendc.simulator.resources.interference.InterferenceKey
+import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.*
/**
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
index 6577fbfc..f71446f8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
@@ -46,7 +46,7 @@ public class PStatePowerDriver(states: Map<Double, PowerModel>) : PowerDriver {
for (cpu in cpus) {
targetFreq = max(cpu.capacity, targetFreq)
- totalSpeed += cpu.speed
+ totalSpeed += cpu.rate
}
val maxFreq = states.lastKey()
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
index bf7aeff1..34e91c35 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
@@ -37,7 +37,7 @@ public class SimplePowerDriver(private val model: PowerModel) : PowerDriver {
for (cpu in cpus) {
targetFreq += cpu.capacity
- totalSpeed += cpu.speed
+ totalSpeed += cpu.rate
}
return model.computePower(totalSpeed / targetFreq)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index a01fa20c..99f4a1e1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* A [SimWorkload] that models applications as a static number of floating point operations ([flops]) executed on
@@ -44,7 +44,7 @@ public class SimFlopsWorkload(
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
- cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)))
+ cpu.startConsumer(lifecycle.waitFor(FixedFlowSource(flops.toDouble() / ctx.cpus.size, utilization)))
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index 4ee56689..2ef3bc43 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* A [SimWorkload] that models application execution as a single duration.
@@ -44,7 +44,7 @@ public class SimRuntimeWorkload(
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
val limit = cpu.capacity * utilization
- cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization)))
+ cpu.startConsumer(lifecycle.waitFor(FixedFlowSource((limit / 1000) * duration, utilization)))
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index dd582bb2..a877dac1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -24,8 +24,8 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
import kotlin.math.min
/**
@@ -78,12 +78,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
return now >= timestamp + duration
}
- private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ private inner class Consumer(val cpu: ProcessingUnit) : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val fragment = pullFragment(now)
if (fragment == null) {
- ctx.close()
+ conn.close()
return Long.MAX_VALUE
}
@@ -91,7 +91,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
// Fragment is in the future
if (timestamp > now) {
- ctx.push(0.0)
+ conn.push(0.0)
return timestamp - now
}
@@ -103,7 +103,7 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val
val deadline = timestamp + fragment.duration
val duration = deadline - now
- ctx.push(if (cpu.id < cores && usage > 0.0) usage else 0.0)
+ conn.push(if (cpu.id < cores && usage > 0.0) usage else 0.0)
return duration
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index 5dd18271..dabe60e0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -23,9 +23,9 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
/**
* A helper class to manage the lifecycle of a [SimWorkload]
@@ -34,27 +34,27 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
/**
* The resource consumers which represent the lifecycle of the workload.
*/
- private val waiting = mutableSetOf<SimResourceConsumer>()
+ private val waiting = mutableSetOf<FlowSource>()
/**
* Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
*/
- public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer {
+ public fun waitFor(consumer: FlowSource): FlowSource {
waiting.add(consumer)
- return object : SimResourceConsumer by consumer {
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ return object : FlowSource by consumer {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
try {
- consumer.onEvent(ctx, event)
+ consumer.onEvent(conn, now, event)
} finally {
- if (event == SimResourceEvent.Exit) {
+ if (event == FlowEvent.Exit) {
complete(consumer)
}
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
try {
- consumer.onFailure(ctx, cause)
+ consumer.onFailure(conn, cause)
} finally {
complete(consumer)
}
@@ -65,9 +65,9 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
}
/**
- * Complete the specified [SimResourceConsumer].
+ * Complete the specified [FlowSource].
*/
- private fun complete(consumer: SimResourceConsumer) {
+ private fun complete(consumer: FlowSource) {
if (waiting.remove(consumer) && waiting.isEmpty()) {
ctx.close()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 81268879..0bb24ed8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -34,10 +34,10 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.compute.workload.SimWorkloadLifecycle
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.source.FixedFlowSource
import org.opendc.simulator.network.SimNetworkSink
import org.opendc.simulator.power.SimPowerSource
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
* Test suite for the [SimBareMetalMachine] class.
@@ -60,7 +60,7 @@ class SimMachineTest {
@Test
fun testFlopsWorkload() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -83,7 +83,7 @@ class SimMachineTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -100,13 +100,13 @@ class SimMachineTest {
@Test
fun testPower() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(LinearPowerModel(100.0, 50.0))
)
- val source = SimPowerSource(interpreter, capacity = 1000.0)
+ val source = SimPowerSource(engine, capacity = 1000.0)
source.connect(machine.psu)
try {
@@ -125,7 +125,7 @@ class SimMachineTest {
@Test
fun testCapacityClamp() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -151,7 +151,7 @@ class SimMachineTest {
@Test
fun testMemory() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -171,7 +171,7 @@ class SimMachineTest {
@Test
fun testMemoryUsage() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -180,7 +180,7 @@ class SimMachineTest {
machine.run(object : SimWorkload {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
- ctx.memory.startConsumer(lifecycle.waitFor(SimWorkConsumer(ctx.memory.capacity, utilization = 0.8)))
+ ctx.memory.startConsumer(lifecycle.waitFor(FixedFlowSource(ctx.memory.capacity, utilization = 0.8)))
}
})
@@ -192,22 +192,22 @@ class SimMachineTest {
@Test
fun testNetUsage() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
val adapter = (machine.peripherals[0] as SimNetworkAdapter)
- adapter.connect(SimNetworkSink(interpreter, adapter.bandwidth))
+ adapter.connect(SimNetworkSink(engine, adapter.bandwidth))
try {
machine.run(object : SimWorkload {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
val iface = ctx.net[0]
- iface.tx.startConsumer(lifecycle.waitFor(SimWorkConsumer(iface.bandwidth, utilization = 0.8)))
+ iface.tx.startConsumer(lifecycle.waitFor(FixedFlowSource(iface.bandwidth, utilization = 0.8)))
}
})
@@ -219,9 +219,9 @@ class SimMachineTest {
@Test
fun testDiskReadUsage() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -231,7 +231,7 @@ class SimMachineTest {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
val disk = ctx.storage[0]
- disk.read.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.read.capacity, utilization = 0.8)))
+ disk.read.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.read.capacity, utilization = 0.8)))
}
})
@@ -243,9 +243,9 @@ class SimMachineTest {
@Test
fun testDiskWriteUsage() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter,
+ engine,
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -255,7 +255,7 @@ class SimMachineTest {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
val disk = ctx.storage[0]
- disk.write.startConsumer(lifecycle.waitFor(SimWorkConsumer(disk.write.capacity, utilization = 0.8)))
+ disk.write.startConsumer(lifecycle.waitFor(FixedFlowSource(disk.write.capacity, utilization = 0.8)))
}
})
@@ -268,7 +268,7 @@ class SimMachineTest {
@Test
fun testCancellation() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -290,7 +290,7 @@ class SimMachineTest {
@Test
fun testConcurrentRuns() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -313,7 +313,7 @@ class SimMachineTest {
@Test
fun testClose() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt
index 6c9ec7bd..e5b509f0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/device/SimPsuTest.kt
@@ -29,8 +29,8 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.simulator.power.SimPowerSource
-import org.opendc.simulator.resources.SimResourceInterpreter
/**
* Test suite for [SimPsu]
@@ -55,8 +55,8 @@ internal class SimPsuTest {
val ratedOutputPower = 240.0
val energyEfficiency = mapOf(0.0 to 1.0)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = ratedOutputPower)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = ratedOutputPower)
val cpuLogic = mockk<PowerDriver.Logic>()
every { cpuLogic.computePower() } returns 0.0
@@ -78,8 +78,8 @@ internal class SimPsuTest {
1.0 to 0.94,
)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = ratedOutputPower)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = ratedOutputPower)
val cpuLogic = mockk<PowerDriver.Logic>()
every { cpuLogic.computePower() } returnsMany listOf(50.0, 100.0, 150.0, 200.0)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
index 8cd535ad..058d5d28 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt
@@ -40,7 +40,7 @@ import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* Test suite for the [SimHypervisor] class.
@@ -94,7 +94,7 @@ internal class SimHypervisorTest {
),
)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener)
@@ -163,7 +163,7 @@ internal class SimHypervisorTest {
)
)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -204,7 +204,7 @@ internal class SimHypervisorTest {
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -234,7 +234,7 @@ internal class SimHypervisorTest {
)
val interferenceModel = VmInterferenceModel(groups)
- val platform = SimResourceInterpreter(coroutineContext, clock)
+ val platform = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index 55d6d7c4..95fb6679 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -40,7 +40,7 @@ import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* A test suite for the [SimSpaceSharedHypervisor].
@@ -74,11 +74,11 @@ internal class SimSpaceSharedHypervisorTest {
),
)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ FlowEngine(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
val vm = hypervisor.createMachine(machineModel)
@@ -98,11 +98,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testRuntimeWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimRuntimeWorkload(duration)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -121,11 +121,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testFlopsWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -142,11 +142,11 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testTwoWorkloads() = runBlockingSimulation {
val duration = 5 * 60L * 1000
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val engine = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
+ engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -170,11 +170,9 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val machine = SimBareMetalMachine(
- interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
- )
- val hypervisor = SimSpaceSharedHypervisor(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val machine = SimBareMetalMachine(engine, machineModel, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimSpaceSharedHypervisor(engine)
launch { machine.run(hypervisor) }
yield()
@@ -194,7 +192,7 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val interpreter = FlowEngine(coroutineContext, clock)
val machine = SimBareMetalMachine(
interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
index c39859bf..f557c8d3 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
@@ -55,7 +55,7 @@ internal class PStatePowerDriverTest {
val cpu = mockk<SimProcessingUnit>(relaxUnitFun = true)
every { cpu.capacity } returns 3200.0
- every { cpu.speed } returns 1200.0
+ every { cpu.rate } returns 1200.0
val driver = PStatePowerDriver(
sortedMapOf(
@@ -77,10 +77,10 @@ internal class PStatePowerDriverTest {
val cpus = listOf(cpu, cpu)
every { cpus[0].capacity } returns 1000.0
- every { cpus[0].speed } returns 1200.0
+ every { cpus[0].rate } returns 1200.0
every { cpus[1].capacity } returns 3500.0
- every { cpus[1].speed } returns 1200.0
+ every { cpus[1].rate } returns 1200.0
val driver = PStatePowerDriver(
sortedMapOf(
@@ -112,11 +112,11 @@ internal class PStatePowerDriverTest {
val logic = driver.createLogic(machine, listOf(cpu))
- every { cpu.speed } returns 1400.0
+ every { cpu.rate } returns 1400.0
every { cpu.capacity } returns 1400.0
assertEquals(150.0, logic.computePower())
- every { cpu.speed } returns 1400.0
+ every { cpu.rate } returns 1400.0
every { cpu.capacity } returns 4000.0
assertEquals(235.0, logic.computePower())
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index 78019c2e..cdbffe4b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -31,7 +31,7 @@ import org.opendc.simulator.compute.model.*
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
/**
* Test suite for the [SimTraceWorkloadTest] class.
@@ -52,7 +52,7 @@ class SimTraceWorkloadTest {
@Test
fun testSmoke() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -79,7 +79,7 @@ class SimTraceWorkloadTest {
@Test
fun testOffset() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -106,7 +106,7 @@ class SimTraceWorkloadTest {
@Test
fun testSkipFragment() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
@@ -134,7 +134,7 @@ class SimTraceWorkloadTest {
@Test
fun testZeroCores() = runBlockingSimulation {
val machine = SimBareMetalMachine(
- SimResourceInterpreter(coroutineContext, clock),
+ FlowEngine(coroutineContext, clock),
machineModel,
SimplePowerDriver(ConstantPowerModel(0.0))
)
diff --git a/opendc-simulator/opendc-simulator-resources/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
index 68047d5c..5a956fee 100644
--- a/opendc-simulator/opendc-simulator-resources/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-description = "Uniform resource consumption simulation model"
+description = "High-performance flow simulator"
plugins {
`kotlin-library-conventions`
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index fbc3f319..4834f10f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -20,13 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow.source.TraceFlowSource
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
@@ -36,101 +38,101 @@ import java.util.concurrent.TimeUnit
@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@OptIn(ExperimentalCoroutinesApi::class)
-class SimResourceBenchmarks {
+class FlowBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var interpreter: SimResourceInterpreter
+ private lateinit var engine: FlowEngine
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
+ engine = FlowEngine(scope.coroutineContext, scope.clock)
}
@State(Scope.Thread)
class Workload {
- lateinit var trace: Sequence<SimTraceConsumer.Fragment>
+ lateinit var trace: Sequence<TraceFlowSource.Fragment>
@Setup
fun setUp() {
val random = ThreadLocalRandom.current()
- val entries = List(10000) { SimTraceConsumer.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
+ val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
trace = entries.asSequence()
}
}
@Benchmark
- fun benchmarkSource(state: Workload) {
+ fun benchmarkSink(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, interpreter)
- return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
+ val provider = FlowSink(engine, 4200.0)
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkForwardOverhead(state: Workload) {
+ fun benchmarkForward(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, interpreter)
- val forwarder = SimResourceForwarder()
+ val provider = FlowSink(engine, 4200.0)
+ val forwarder = FlowForwarder(engine)
provider.startConsumer(forwarder)
- return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace))
+ return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
+ fun benchmarkMuxMaxMinSingleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(interpreter)
+ val switch = MaxMinFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
- val provider = switch.newOutput()
- return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
+ fun benchmarkMuxMaxMinTripleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(interpreter)
+ val switch = MaxMinFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
repeat(3) {
launch {
- val provider = switch.newOutput()
- provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(state.trace))
}
}
}
}
@Benchmark
- fun benchmarkSwitchExclusiveSingleConsumer(state: Workload) {
+ fun benchmarkMuxExclusiveSingleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchExclusive()
+ val switch = ForwardingFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
- val provider = switch.newOutput()
- return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
}
}
@Benchmark
- fun benchmarkSwitchExclusiveTripleConsumer(state: Workload) {
+ fun benchmarkMuxExclusiveTripleSource(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchExclusive()
+ val switch = ForwardingFlowMultiplexer(engine)
- switch.addInput(SimResourceSource(3000.0, interpreter))
- switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
repeat(2) {
launch {
- val provider = switch.newOutput()
- provider.consume(SimTraceConsumer(state.trace))
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(state.trace))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
index 085cba63..c8092082 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -20,25 +20,22 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
-import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+import org.opendc.simulator.flow.internal.FlowCountersImpl
/**
- * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations.
+ * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations.
*/
-public abstract class SimAbstractResourceProvider(
- private val interpreter: SimResourceInterpreter,
- initialCapacity: Double
-) : SimResourceProvider {
+public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer {
/**
- * A flag to indicate that the resource provider is active.
+ * A flag to indicate that the flow consumer is active.
*/
public override val isActive: Boolean
get() = ctx != null
/**
- * The capacity of the resource.
+ * The capacity of the consumer.
*/
public override var capacity: Double = initialCapacity
set(value) {
@@ -47,51 +44,51 @@ public abstract class SimAbstractResourceProvider(
}
/**
- * The current processing speed of the resource.
+ * The current processing rate of the consumer.
*/
- public override val speed: Double
- get() = ctx?.speed ?: 0.0
+ public override val rate: Double
+ get() = ctx?.rate ?: 0.0
/**
- * The resource processing speed demand at this instant.
+ * The flow processing rate demand at this instant.
*/
public override val demand: Double
get() = ctx?.demand ?: 0.0
/**
- * The resource counters to track the execution metrics of the resource.
+ * The flow counters to track the flow metrics of the consumer.
*/
- public override val counters: SimResourceCounters
+ public override val counters: FlowCounters
get() = _counters
- private val _counters = SimResourceCountersImpl()
+ private val _counters = FlowCountersImpl()
/**
- * The [SimResourceControllableContext] that is currently running.
+ * The [FlowConsumerContext] that is currently running.
*/
- protected var ctx: SimResourceControllableContext? = null
+ protected var ctx: FlowConsumerContext? = null
private set
/**
- * Construct the [SimResourceProviderLogic] instance for a new consumer.
+ * Construct the [FlowConsumerLogic] instance for a new source.
*/
- protected abstract fun createLogic(): SimResourceProviderLogic
+ protected abstract fun createLogic(): FlowConsumerLogic
/**
- * Start the specified [SimResourceControllableContext].
+ * Start the specified [FlowConsumerContext].
*/
- protected open fun start(ctx: SimResourceControllableContext) {
+ protected open fun start(ctx: FlowConsumerContext) {
ctx.start()
}
/**
- * The previous demand for the resource.
+ * The previous demand for the consumer.
*/
private var previousDemand = 0.0
/**
- * Update the counters of the resource provider.
+ * Update the counters of the flow consumer.
*/
- protected fun updateCounters(ctx: SimResourceContext, delta: Long) {
+ protected fun updateCounters(ctx: FlowConnection, delta: Long) {
val demand = previousDemand
previousDemand = ctx.demand
@@ -102,7 +99,7 @@ public abstract class SimAbstractResourceProvider(
val counters = _counters
val deltaS = delta / 1000.0
val work = demand * deltaS
- val actualWork = ctx.speed * deltaS
+ val actualWork = ctx.rate * deltaS
val remainingWork = work - actualWork
counters.demand += work
@@ -111,7 +108,7 @@ public abstract class SimAbstractResourceProvider(
}
/**
- * Update the counters of the resource provider.
+ * Update the counters of the flow consumer.
*/
protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
val counters = _counters
@@ -120,9 +117,9 @@ public abstract class SimAbstractResourceProvider(
counters.overcommit += overcommit
}
- final override fun startConsumer(consumer: SimResourceConsumer) {
- check(ctx == null) { "Resource is in invalid state" }
- val ctx = interpreter.newContext(consumer, createLogic())
+ final override fun startConsumer(source: FlowSource) {
+ check(ctx == null) { "Consumer is in invalid state" }
+ val ctx = engine.newContext(source, createLogic())
ctx.capacity = capacity
this.ctx = ctx
@@ -130,8 +127,8 @@ public abstract class SimAbstractResourceProvider(
start(ctx)
}
- final override fun interrupt() {
- ctx?.interrupt()
+ final override fun pull() {
+ ctx?.pull()
}
final override fun cancel() {
@@ -142,5 +139,5 @@ public abstract class SimAbstractResourceProvider(
}
}
- override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]"
+ override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
index 225cae0b..fa833961 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -20,49 +20,41 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
-
-import java.time.Clock
+package org.opendc.simulator.flow
/**
- * The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
- * resource and a resource consumer.
+ * An active connection between a [FlowSource] and [FlowConsumer].
*/
-public interface SimResourceContext : AutoCloseable {
- /**
- * The virtual clock tracking simulation time.
- */
- public val clock: Clock
-
+public interface FlowConnection : AutoCloseable {
/**
- * The resource capacity available at this instant.
+ * The capacity of the connection.
*/
public val capacity: Double
/**
- * The resource processing speed at this instant.
+ * The flow rate over the connection.
*/
- public val speed: Double
+ public val rate: Double
/**
- * The resource processing speed demand at this instant.
+ * The flow demand of the source.
*/
public val demand: Double
/**
- * Ask the resource provider to interrupt its resource.
+ * Pull the source.
*/
- public fun interrupt()
+ public fun pull()
/**
- * Push the given flow to this context.
+ * Push the given flow [rate] over this connection.
*
* @param rate The rate of the flow to push.
*/
public fun push(rate: Double)
/**
- * Stop the resource context.
+ * Disconnect the consumer from its source.
*/
public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
index b68b7261..3a6e2e97 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -20,77 +20,80 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
- * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer].
+ * A consumer of a [FlowSource].
*/
-public interface SimResourceProvider {
+public interface FlowConsumer {
/**
- * A flag to indicate that the resource provider is currently being consumed by a [SimResourceConsumer].
+ * A flag to indicate that the consumer is currently consuming a [FlowSource].
*/
public val isActive: Boolean
/**
- * The resource capacity available at this instant.
+ * The flow capacity of this consumer.
*/
public val capacity: Double
/**
- * The current processing speed of the resource.
+ * The current flow rate of the consumer.
*/
- public val speed: Double
+ public val rate: Double
/**
- * The resource processing speed demand at this instant.
+ * The current flow demand.
*/
public val demand: Double
/**
- * The resource counters to track the execution metrics of the resource.
+ * The flow counters to track the flow metrics of the consumer.
*/
- public val counters: SimResourceCounters
+ public val counters: FlowCounters
/**
- * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
+ * Start consuming the specified [source].
*
- * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
+ * @throws IllegalStateException if the consumer is already active.
*/
- public fun startConsumer(consumer: SimResourceConsumer)
+ public fun startConsumer(source: FlowSource)
/**
- * Interrupt the resource consumer. If there is no consumer active, this operation will be a no-op.
+ * Ask the consumer to pull its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
*/
- public fun interrupt()
+ public fun pull()
/**
- * Cancel the current resource consumer. If there is no consumer active, this operation will be a no-op.
+ * Disconnect the consumer from its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
*/
public fun cancel()
}
/**
- * Consume the resource provided by this provider using the specified [consumer] and suspend execution until
- * the consumer has finished.
+ * Consume the specified [source] and suspend execution until the source is fully consumed or failed.
*/
-public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
+public suspend fun FlowConsumer.consume(source: FlowSource) {
return suspendCancellableCoroutine { cont ->
- startConsumer(object : SimResourceConsumer by consumer {
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- consumer.onEvent(ctx, event)
+ startConsumer(object : FlowSource by source {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ source.onEvent(conn, now, event)
- if (event == SimResourceEvent.Exit && !cont.isCompleted) {
+ if (event == FlowEvent.Exit && !cont.isCompleted) {
cont.resume(Unit)
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
try {
- consumer.onFailure(ctx, cause)
+ source.onFailure(conn, cause)
cont.resumeWithException(cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
@@ -98,7 +101,7 @@ public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
}
}
- override fun toString(): String = "SimSuspendingResourceConsumer"
+ override fun toString(): String = "SuspendingFlowSource"
})
cont.invokeOnCancellation { cancel() }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
new file mode 100644
index 00000000..75b2d25b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A controllable [FlowConnection].
+ *
+ * This interface is used by [FlowConsumer]s to control the connection between it and the source.
+ */
+public interface FlowConsumerContext : FlowConnection {
+ /**
+ * The capacity of the connection.
+ */
+ public override var capacity: Double
+
+ /**
+ * Start the flow over the connection.
+ */
+ public fun start()
+
+ /**
+ * Synchronously flush the changes of the connection.
+ */
+ public fun flush()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
index cc718165..c69cb17e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -20,23 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A collection of callbacks associated with a flow stage.
+ * A collection of callbacks associated with a [FlowConsumer].
*/
-public interface SimResourceProviderLogic {
+public interface FlowConsumerLogic {
/**
- * This method is invoked when the consumer ask to consume the resource for the specified [duration].
+ * This method is invoked when a [FlowSource] changes the rate of flow to this consumer.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds.
- * @param limit The limit on the work rate of the resource consumer.
- * @param duration The duration of the consumption in milliseconds.
- * @return The deadline of the resource consumption.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
+ * @param rate The requested processing rate of the source.
*/
- public fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {}
+ public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {}
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -45,14 +43,14 @@ public interface SimResourceProviderLogic {
* @param now The virtual timestamp in milliseconds at which the system converged.
* @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {}
+ public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
/**
- * This method is invoked when the resource consumer has finished.
+ * This method is invoked when the [FlowSource] is completed.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
*/
- public fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {}
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
index 11924db2..e15d7643 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -20,34 +20,34 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * An interface that tracks cumulative counts of the work performed by a resource.
+ * An interface that tracks cumulative counts of the flow accumulation over a stage.
*/
-public interface SimResourceCounters {
+public interface FlowCounters {
/**
- * The amount of work that resource consumers wanted the resource to perform.
+ * The accumulated flow that a source wanted to push over the connection.
*/
public val demand: Double
/**
- * The amount of work performed by the resource.
+ * The accumulated flow that was actually transferred over the connection.
*/
public val actual: Double
/**
- * The amount of work that could not be completed due to overcommitted resources.
+ * The accumulated flow that could not be transferred over the connection.
*/
public val overcommit: Double
/**
- * The amount of work lost due to interference.
+ * The accumulated flow lost due to interference between sources.
*/
public val interference: Double
/**
- * Reset the resource counters.
+ * Reset the flow counters.
*/
public fun reset()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
index 4bfeaf20..65224827 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
@@ -20,31 +20,31 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
import java.time.Clock
import kotlin.coroutines.CoroutineContext
/**
- * The resource interpreter is responsible for managing the interaction between resource consumer and provider.
+ * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s.
*
- * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation
+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
* to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
*/
-public interface SimResourceInterpreter {
+public interface FlowEngine {
/**
- * The [Clock] associated with this interpreter.
+ * The virtual [Clock] associated with this engine.
*/
public val clock: Clock
/**
- * Create a new [SimResourceControllableContext] with the given [provider].
+ * Create a new [FlowConsumerContext] with the given [provider].
*
* @param consumer The consumer logic.
* @param provider The logic of the resource provider.
*/
- public fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext
+ public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext
/**
* Start batching the execution of resource updates until [popBatch] is called.
@@ -67,14 +67,15 @@ public interface SimResourceInterpreter {
public companion object {
/**
- * Construct a new [SimResourceInterpreter] implementation.
+ * Construct a new [FlowEngine] implementation.
*
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
+ @JvmStatic
@JvmName("create")
- public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter {
- return SimResourceInterpreterImpl(context, clock)
+ public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine {
+ return FlowEngineImpl(context, clock)
}
}
}
@@ -84,7 +85,7 @@ public interface SimResourceInterpreter {
*
* This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
*/
-public inline fun SimResourceInterpreter.batch(block: () -> Unit) {
+public inline fun FlowEngine.batch(block: () -> Unit) {
try {
pushBatch()
block()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
index 959427f1..14c85183 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
@@ -20,29 +20,29 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A resource event that is communicated to the resource consumer.
+ * A flow event that is communicated to a [FlowSource].
*/
-public enum class SimResourceEvent {
+public enum class FlowEvent {
/**
- * This event is emitted to the consumer when it has started.
+ * This event is emitted to the source when it has started.
*/
Start,
/**
- * This event is emitted to the consumer when it has exited.
+ * This event is emitted to the source when it is stopped.
*/
Exit,
/**
- * This event is emitted to the consumer when it has started a new resource consumption or idle cycle.
+ * This event is emitted to the source when the system has converged into a steady state.
*/
- Run,
+ Converge,
/**
- * This event is emitted to the consumer when the capacity of the resource has changed.
+ * This event is emitted to the source when the capacity of the consumer has changed.
*/
Capacity,
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 0cd2bfc7..2074033e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -20,21 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
-import org.opendc.simulator.resources.impl.SimResourceCountersImpl
-import java.time.Clock
+import org.opendc.simulator.flow.internal.FlowCountersImpl
/**
- * A class that acts as a [SimResourceConsumer] and [SimResourceProvider] at the same time.
+ * A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
*
+ * @param engine The [FlowEngine] the forwarder runs in.
* @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
*/
-public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimResourceConsumer, SimResourceProvider, AutoCloseable {
+public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable {
/**
- * The delegate [SimResourceConsumer].
+ * The delegate [FlowSource].
*/
- private var delegate: SimResourceConsumer? = null
+ private var delegate: FlowSource? = null
/**
* A flag to indicate that the delegate was started.
@@ -42,28 +42,25 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
private var hasDelegateStarted: Boolean = false
/**
- * The exposed [SimResourceContext].
+ * The exposed [FlowConnection].
*/
- private val _ctx = object : SimResourceContext {
- override val clock: Clock
- get() = _innerCtx!!.clock
-
+ private val _ctx = object : FlowConnection {
override val capacity: Double
get() = _innerCtx?.capacity ?: 0.0
override val demand: Double
get() = _innerCtx?.demand ?: 0.0
- override val speed: Double
- get() = _innerCtx?.speed ?: 0.0
+ override val rate: Double
+ get() = _innerCtx?.rate ?: 0.0
- override fun interrupt() {
- _innerCtx?.interrupt()
+ override fun pull() {
+ _innerCtx?.pull()
}
override fun push(rate: Double) {
_innerCtx?.push(rate)
- _limit = rate
+ _demand = rate
}
override fun close() {
@@ -78,14 +75,14 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
// reset beforehand the existing state and check whether it has been updated afterwards
reset()
- delegate.onEvent(this, SimResourceEvent.Exit)
+ delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit)
}
}
/**
- * The [SimResourceContext] in which the forwarder runs.
+ * The [FlowConnection] in which the forwarder runs.
*/
- private var _innerCtx: SimResourceContext? = null
+ private var _innerCtx: FlowConnection? = null
override val isActive: Boolean
get() = delegate != null
@@ -93,27 +90,27 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
override val capacity: Double
get() = _ctx.capacity
- override val speed: Double
- get() = _ctx.speed
+ override val rate: Double
+ get() = _ctx.rate
override val demand: Double
get() = _ctx.demand
- override val counters: SimResourceCounters
+ override val counters: FlowCounters
get() = _counters
- private val _counters = SimResourceCountersImpl()
+ private val _counters = FlowCountersImpl()
- override fun startConsumer(consumer: SimResourceConsumer) {
- check(delegate == null) { "Resource transformer already active" }
+ override fun startConsumer(source: FlowSource) {
+ check(delegate == null) { "Forwarder already active" }
- delegate = consumer
+ delegate = source
- // Interrupt the provider to replace the consumer
- interrupt()
+ // Pull to replace the source
+ pull()
}
- override fun interrupt() {
- _ctx.interrupt()
+ override fun pull() {
+ _ctx.pull()
}
override fun cancel() {
@@ -124,7 +121,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
this.delegate = null
if (ctx != null) {
- delegate.onEvent(this._ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit)
}
}
}
@@ -134,41 +131,41 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
if (ctx != null) {
this._innerCtx = null
- ctx.interrupt()
+ ctx.pull()
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val delegate = delegate
if (!hasDelegateStarted) {
start()
}
- updateCounters(ctx, delta)
+ updateCounters(conn, delta)
- return delegate?.onNext(this._ctx, now, delta) ?: Long.MAX_VALUE
+ return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> {
- _innerCtx = ctx
+ FlowEvent.Start -> {
+ _innerCtx = conn
}
- SimResourceEvent.Exit -> {
+ FlowEvent.Exit -> {
_innerCtx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onEvent(this._ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this._ctx, now, FlowEvent.Exit)
}
}
- else -> delegate?.onEvent(this._ctx, event)
+ else -> delegate?.onEvent(this._ctx, now, event)
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
_innerCtx = null
val delegate = delegate
@@ -183,7 +180,7 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
*/
private fun start() {
val delegate = delegate ?: return
- delegate.onEvent(checkNotNull(_innerCtx), SimResourceEvent.Start)
+ delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start)
hasDelegateStarted = true
}
@@ -197,22 +194,22 @@ public class SimResourceForwarder(private val isCoupled: Boolean = false) : SimR
}
/**
- * The requested speed.
+ * The requested flow rate.
*/
- private var _limit: Double = 0.0
+ private var _demand: Double = 0.0
/**
- * Update the resource counters for the transformer.
+ * Update the flow counters for the transformer.
*/
- private fun updateCounters(ctx: SimResourceContext, delta: Long) {
+ private fun updateCounters(ctx: FlowConnection, delta: Long) {
if (delta <= 0) {
return
}
val counters = _counters
val deltaS = delta / 1000.0
- val work = _limit * deltaS
- val actualWork = ctx.speed * deltaS
+ val work = _demand * deltaS
+ val actualWork = ctx.rate * deltaS
counters.demand += work
counters.actual += actualWork
counters.overcommit += (work - actualWork)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index c8d4cf0d..fb6ca85d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -20,42 +20,42 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity.
+ * A [FlowSink] represents a sink with a fixed capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param interpreter The interpreter that is used for managing the resource contexts.
- * @param parent The parent resource system.
+ * @param engine The engine that is used for driving the flow simulation.
+ * @param parent The parent flow system.
*/
-public class SimResourceSource(
+public class FlowSink(
+ private val engine: FlowEngine,
initialCapacity: Double,
- private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null
-) : SimAbstractResourceProvider(interpreter, initialCapacity) {
- override fun createLogic(): SimResourceProviderLogic {
- return object : SimResourceProviderLogic {
- override fun onConsume(
- ctx: SimResourceControllableContext,
+ private val parent: FlowSystem? = null
+) : AbstractFlowConsumer(engine, initialCapacity) {
+
+ override fun createLogic(): FlowConsumerLogic {
+ return object : FlowConsumerLogic {
+ override fun onPush(
+ ctx: FlowConsumerContext,
now: Long,
delta: Long,
- limit: Double,
- duration: Long
+ rate: Double
) {
updateCounters(ctx, delta)
}
- override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
updateCounters(ctx, delta)
cancel()
}
- override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
parent?.onConverge(now)
}
}
}
- override fun toString(): String = "SimResourceSource[capacity=$capacity]"
+ override fun toString(): String = "FlowSink[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index 0b25358a..077b4d38 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -20,38 +20,39 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
- * A [SimResourceConsumer] characterizes how a resource is consumed.
+ * A source of flow that is consumed by a [FlowConsumer].
*
- * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
- * for multiple resource providers, unless explicitly said otherwise.
+ * Implementations of this interface should be considered stateful and must be assumed not to be re-usable
+ * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise.
*/
-public interface SimResourceConsumer {
+public interface FlowSource {
/**
- * This method is invoked when the resource provider is pulling this resource consumer.
+ * This method is invoked when the source is pulled.
*
- * @param ctx The execution context in which the consumer runs.
- * @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param delta The virtual duration between this call and the last call in milliseconds.
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the pull is occurring.
+ * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
* @return The duration after which the resource consumer should be pulled again.
*/
- public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long
+ public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
/**
* This method is invoked when an event has occurred.
*
- * @param ctx The execution context in which the consumer runs.
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the event is occurring.
* @param event The event that has occurred.
*/
- public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {}
+ public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {}
/**
- * This method is invoked when a resource consumer throws an exception.
+ * This method is invoked when the source throws an exception.
*
- * @param ctx The execution context in which the consumer runs.
+ * @param conn The connection between the source and consumer.
* @param cause The cause of the failure.
*/
- public fun onFailure(ctx: SimResourceContext, cause: Throwable) {}
+ public fun onFailure(conn: FlowConnection, cause: Throwable) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt
index 609262cb..db6aa69f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
/**
* A system of possible multiple sub-resources.
@@ -28,11 +28,11 @@ package org.opendc.simulator.resources
* This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the
* resource provider.
*/
-public interface SimResourceSystem {
+public interface FlowSystem {
/**
* The parent system to which this system belongs or `null` if it has no parent.
*/
- public val parent: SimResourceSystem?
+ public val parent: FlowSystem?
/**
* This method is invoked when the system has converged to a steady-state.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
index 1066777f..aa2713b6 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceDomain.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
@@ -1,10 +1,10 @@
-package org.opendc.simulator.resources.interference
+package org.opendc.simulator.flow.interference
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.flow.FlowSource
/**
- * An interference domain represents a system of resources where [resource consumers][SimResourceConsumer] may incur
- * performance variability due to operating on the same resources and therefore causing interference.
+ * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur
+ * performance variability due to operating on the same resource and therefore causing interference.
*/
public interface InterferenceDomain {
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
index 8b12e7b4..d28ebde5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/interference/InterferenceKey.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.interference
+package org.opendc.simulator.flow.interference
/**
* A key that uniquely identifies a participant of an interference domain.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index cbfa7afd..9f3afc4d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -20,28 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.impl
+package org.opendc.simulator.flow.internal
-import org.opendc.simulator.resources.*
-import java.time.Clock
+import org.opendc.simulator.flow.*
import java.util.ArrayDeque
import kotlin.math.max
import kotlin.math.min
/**
- * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ * Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
*/
-internal class SimResourceContextImpl(
- private val interpreter: SimResourceInterpreterImpl,
- private val consumer: SimResourceConsumer,
- private val logic: SimResourceProviderLogic
-) : SimResourceControllableContext {
+internal class FlowConsumerContextImpl(
+ private val engine: FlowEngineImpl,
+ private val source: FlowSource,
+ private val logic: FlowConsumerLogic
+) : FlowConsumerContext {
/**
- * The clock of the context.
+ * The clock to track simulation time.
*/
- override val clock: Clock
- get() = _clock
- private val _clock = interpreter.clock
+ private val _clock = engine.clock
/**
* The capacity of the resource.
@@ -65,7 +62,7 @@ internal class SimResourceContextImpl(
/**
* The current processing speed of the resource.
*/
- override val speed: Double
+ override val rate: Double
get() = _rate
private var _rate = 0.0
@@ -101,14 +98,14 @@ internal class SimResourceContextImpl(
/**
* The timers at which the context is scheduled to be interrupted.
*/
- private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque()
+ private val _timers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque()
override fun start() {
check(_state == State.Pending) { "Consumer is already started" }
- interpreter.batch {
- consumer.onEvent(this, SimResourceEvent.Start)
+ engine.batch {
+ source.onEvent(this, _clock.millis(), FlowEvent.Start)
_state = State.Active
- interrupt()
+ pull()
}
}
@@ -117,36 +114,27 @@ internal class SimResourceContextImpl(
return
}
- interpreter.batch {
+ engine.batch {
_state = State.Stopped
if (!_updateActive) {
- val now = clock.millis()
+ val now = _clock.millis()
val delta = max(0, now - _lastUpdate)
doStop(now, delta)
// FIX: Make sure the context converges
_flag = _flag or FLAG_INVALIDATE
- scheduleUpdate(clock.millis())
+ scheduleUpdate(_clock.millis())
}
}
}
- override fun interrupt() {
+ override fun pull() {
if (_state == State.Stopped) {
return
}
_flag = _flag or FLAG_INTERRUPT
- scheduleUpdate(clock.millis())
- }
-
- override fun invalidate() {
- if (_state == State.Stopped) {
- return
- }
-
- _flag = _flag or FLAG_INVALIDATE
- scheduleUpdate(clock.millis())
+ scheduleUpdate(_clock.millis())
}
override fun flush() {
@@ -154,7 +142,7 @@ internal class SimResourceContextImpl(
return
}
- interpreter.scheduleSync(clock.millis(), this)
+ engine.scheduleSync(_clock.millis(), this)
}
override fun push(rate: Double) {
@@ -167,7 +155,8 @@ internal class SimResourceContextImpl(
// Invalidate only if the active limit is change and no update is active
// If an update is active, it will already get picked up at the end of the update
if (_activeLimit != rate && !_updateActive) {
- invalidate()
+ _flag = _flag or FLAG_INVALIDATE
+ scheduleUpdate(_clock.millis())
}
}
@@ -196,7 +185,7 @@ internal class SimResourceContextImpl(
val delta = max(0, now - lastUpdate)
try {
- val duration = consumer.onNext(this, now, delta)
+ val duration = source.onPull(this, now, delta)
val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration
// Reset update flags
@@ -205,7 +194,7 @@ internal class SimResourceContextImpl(
// Check whether the state has changed after [consumer.onNext]
when (_state) {
State.Active -> {
- logic.onConsume(this, now, delta, _limit, duration)
+ logic.onPush(this, now, delta, _limit)
// Schedule an update at the new deadline
scheduleUpdate(now, newDeadline)
@@ -261,7 +250,7 @@ internal class SimResourceContextImpl(
try {
if (_state == State.Active) {
- consumer.onEvent(this, SimResourceEvent.Run)
+ source.onEvent(this, timestamp, FlowEvent.Converge)
}
logic.onConverge(this, timestamp, delta)
@@ -270,14 +259,14 @@ internal class SimResourceContextImpl(
}
}
- override fun toString(): String = "SimResourceContextImpl[capacity=$capacity,rate=$_rate]"
+ override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
/**
* Stop the resource context.
*/
private fun doStop(now: Long, delta: Long) {
try {
- consumer.onEvent(this, SimResourceEvent.Exit)
+ source.onEvent(this, now, FlowEvent.Exit)
logic.onFinish(this, now, delta)
} catch (cause: Throwable) {
doFail(now, delta, cause)
@@ -292,7 +281,7 @@ internal class SimResourceContextImpl(
*/
private fun doFail(now: Long, delta: Long, cause: Throwable) {
try {
- consumer.onFailure(this, cause)
+ source.onFailure(this, cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
e.printStackTrace()
@@ -310,11 +299,11 @@ internal class SimResourceContextImpl(
return
}
- interpreter.batch {
+ engine.batch {
// Inform the consumer of the capacity change. This might already trigger an interrupt.
- consumer.onEvent(this, SimResourceEvent.Capacity)
+ source.onEvent(this, _clock.millis(), FlowEvent.Capacity)
- interrupt()
+ pull()
}
}
@@ -322,7 +311,7 @@ internal class SimResourceContextImpl(
* Schedule an update for this resource context.
*/
private fun scheduleUpdate(now: Long) {
- interpreter.scheduleImmediate(now, this)
+ engine.scheduleImmediate(now, this)
}
/**
@@ -331,7 +320,7 @@ internal class SimResourceContextImpl(
private fun scheduleUpdate(now: Long, target: Long) {
val timers = _timers
if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) {
- timers.addFirst(interpreter.scheduleDelayed(now, this, target))
+ timers.addFirst(engine.scheduleDelayed(now, this, target))
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
index 01062179..141d335d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
@@ -20,14 +20,14 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.impl
+package org.opendc.simulator.flow.internal
-import org.opendc.simulator.resources.SimResourceCounters
+import org.opendc.simulator.flow.FlowCounters
/**
- * Mutable implementation of the [SimResourceCounters] interface.
+ * Mutable implementation of the [FlowCounters] interface.
*/
-internal class SimResourceCountersImpl : SimResourceCounters {
+internal class FlowCountersImpl : FlowCounters {
override var demand: Double = 0.0
override var actual: Double = 0.0
override var overcommit: Double = 0.0
@@ -41,6 +41,6 @@ internal class SimResourceCountersImpl : SimResourceCounters {
}
override fun toString(): String {
- return "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 2abf0749..1a50da2c 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -20,25 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.impl
+package org.opendc.simulator.flow.internal
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
import java.time.Clock
import java.util.*
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
/**
- * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after.
+ * Internal implementation of the [FlowEngine] interface.
*
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
-internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter {
+internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine {
/**
* The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
@@ -46,24 +46,24 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
/**
- * The queue of resource updates that are scheduled for immediate execution.
+ * The queue of connection updates that are scheduled for immediate execution.
*/
- private val queue = ArrayDeque<SimResourceContextImpl>()
+ private val queue = ArrayDeque<FlowConsumerContextImpl>()
/**
- * A priority queue containing the resource updates to be scheduled in the future.
+ * A priority queue containing the connection updates to be scheduled in the future.
*/
private val futureQueue = PriorityQueue<Timer>()
/**
- * The stack of interpreter invocations to occur in the future.
+ * The stack of engine invocations to occur in the future.
*/
private val futureInvocations = ArrayDeque<Invocation>()
/**
- * The systems that have been visited during the interpreter cycle.
+ * The systems that have been visited during the engine cycle.
*/
- private val visited = linkedSetOf<SimResourceContextImpl>()
+ private val visited = linkedSetOf<FlowConsumerContextImpl>()
/**
* The index in the batch stack.
@@ -71,7 +71,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
private var batchIndex = 0
/**
- * A flag to indicate that the interpreter is currently active.
+ * A flag to indicate that the engine is currently active.
*/
private val isRunning: Boolean
get() = batchIndex > 0
@@ -79,57 +79,57 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* Update the specified [ctx] synchronously.
*/
- fun scheduleSync(now: Long, ctx: SimResourceContextImpl) {
+ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
ctx.doUpdate(now)
visited.add(ctx)
- // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
- // up by the active interpreter.
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
if (isRunning) {
return
}
try {
batchIndex++
- runInterpreter(now)
+ runEngine(now)
} finally {
batchIndex--
}
}
/**
- * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle.
+ * Enqueue the specified [ctx] to be updated immediately during the active engine cycle.
*
- * This method should be used when the state of a resource context is invalidated/interrupted and needs to be
- * re-computed. In case no interpreter is currently active, the interpreter will be started.
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed. In case no engine is currently active, the engine will be started.
*/
- fun scheduleImmediate(now: Long, ctx: SimResourceContextImpl) {
+ fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) {
queue.add(ctx)
- // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
- // up by the active interpreter.
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
if (isRunning) {
return
}
try {
batchIndex++
- runInterpreter(now)
+ runEngine(now)
} finally {
batchIndex--
}
}
/**
- * Schedule the interpreter to run at [target] to update the resource contexts.
+ * Schedule the engine to run at [target] to update the flow contexts.
*
* This method will override earlier calls to this method for the same [ctx].
*
* @param now The current virtual timestamp.
- * @param ctx The resource context to which the event applies.
+ * @param ctx The flow context to which the event applies.
* @param target The timestamp when the interrupt should happen.
*/
- fun scheduleDelayed(now: Long, ctx: SimResourceContextImpl, target: Long): Timer {
+ fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer {
val futureQueue = futureQueue
require(target >= now) { "Timestamp must be in the future" }
@@ -140,7 +140,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
return timer
}
- override fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext = SimResourceContextImpl(this, consumer, provider)
+ override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
override fun pushBatch() {
batchIndex++
@@ -150,7 +150,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
try {
// Flush the work if the platform is not already running
if (batchIndex == 1 && queue.isNotEmpty()) {
- runInterpreter(clock.millis())
+ runEngine(clock.millis())
}
} finally {
batchIndex--
@@ -158,9 +158,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * Interpret all actions that are scheduled for the current timestamp.
+ * Run all the enqueued actions for the specified [timestamp][now].
*/
- private fun runInterpreter(now: Long) {
+ private fun runEngine(now: Long) {
val queue = queue
val futureQueue = futureQueue
val futureInvocations = futureInvocations
@@ -219,7 +219,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
visited.clear()
} while (queue.isNotEmpty())
- // Schedule an interpreter invocation for the next update to occur.
+ // Schedule an engine invocation for the next update to occur.
val headTimer = futureQueue.peek()
if (headTimer != null) {
trySchedule(now, futureInvocations, headTimer.target)
@@ -227,10 +227,10 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * Try to schedule an interpreter invocation at the specified [target].
+ * Try to schedule an engine invocation at the specified [target].
*
* @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the interpreter invocation should happen.
+ * @param target The virtual timestamp at which the engine invocation should happen.
* @param scheduled The queue of scheduled invocations.
*/
private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
@@ -245,7 +245,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
{
try {
batchIndex++
- runInterpreter(target)
+ runEngine(target)
} finally {
batchIndex--
}
@@ -267,9 +267,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * A future interpreter invocation.
+ * A future engine invocation.
*
- * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case
+ * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
* the invocation is not needed anymore, it can be cancelled via [cancel].
*/
private data class Invocation(
@@ -277,7 +277,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
@JvmField val handle: DisposableHandle
) {
/**
- * Cancel the interpreter invocation.
+ * Cancel the engine invocation.
*/
fun cancel() = handle.dispose()
}
@@ -285,10 +285,9 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* An update call for [ctx] that is scheduled for [target].
*
- * This class represents an update in the future at [target] requested by [ctx]. A deferred update might be
- * cancelled if the resource context was invalidated in the meantime.
+ * This class represents an update in the future at [target] requested by [ctx].
*/
- class Timer(@JvmField val ctx: SimResourceContextImpl, @JvmField val target: Long) : Comparable<Timer> {
+ class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> {
override fun compareTo(other: Timer): Int {
return target.compareTo(other.target)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 3c25b76d..17b82391 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -20,45 +20,48 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.mux
-import org.opendc.simulator.resources.interference.InterferenceKey
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowCounters
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.interference.InterferenceKey
/**
- * A [SimResourceSwitch] enables switching of capacity of multiple resources between multiple consumers.
+ * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
*/
-public interface SimResourceSwitch {
+public interface FlowMultiplexer {
/**
- * The output resource providers to which resource consumers can be attached.
+ * The inputs of the multiplexer that can be used to consume sources.
*/
- public val outputs: Set<SimResourceProvider>
+ public val inputs: Set<FlowConsumer>
/**
- * The input resources that will be switched between the output providers.
+ * The outputs of the multiplexer over which the flows will be distributed.
*/
- public val inputs: Set<SimResourceProvider>
+ public val outputs: Set<FlowConsumer>
/**
- * The resource counters to track the execution metrics of all switch resources.
+ * The flow counters to track the flow metrics of all multiplexer inputs.
*/
- public val counters: SimResourceCounters
+ public val counters: FlowCounters
/**
- * Create a new output on the switch.
+ * Create a new input on this multiplexer.
*
- * @param key The key of the interference member to which the output belongs.
+ * @param key The key of the interference member to which the input belongs.
*/
- public fun newOutput(key: InterferenceKey? = null): SimResourceProvider
+ public fun newInput(key: InterferenceKey? = null): FlowConsumer
/**
- * Remove [output] from this switch.
+ * Remove [input] from this multiplexer.
*/
- public fun removeOutput(output: SimResourceProvider)
+ public fun removeInput(input: FlowConsumer)
/**
- * Add the specified [input] to the switch.
+ * Add the specified [output] to the multiplexer.
*/
- public fun addInput(input: SimResourceProvider)
+ public fun addOutput(output: FlowConsumer)
/**
* Clear all inputs and outputs from the switch.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
new file mode 100644
index 00000000..811d9460
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceKey
+import java.util.ArrayDeque
+
+/**
+ * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
+ * that a single input is directly connected to an output and that the multiplexer can only support as many
+ * inputs as outputs.
+ *
+ * @param engine The [FlowEngine] driving the simulation.
+ */
+public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer {
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+
+ override val outputs: Set<FlowConsumer>
+ get() = _outputs
+ private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _availableOutputs = ArrayDeque<FlowForwarder>()
+
+ override val counters: FlowCounters = object : FlowCounters {
+ override val demand: Double
+ get() = _outputs.sumOf { it.counters.demand }
+ override val actual: Double
+ get() = _outputs.sumOf { it.counters.actual }
+ override val overcommit: Double
+ get() = _outputs.sumOf { it.counters.overcommit }
+ override val interference: Double
+ get() = _outputs.sumOf { it.counters.interference }
+
+ override fun reset() {
+ for (input in _outputs) {
+ input.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val output = Input(forwarder)
+ _inputs += output
+ return output
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+
+ (input as Input).close()
+ }
+
+ override fun addOutput(output: FlowConsumer) {
+ if (output in outputs) {
+ return
+ }
+
+ val forwarder = FlowForwarder(engine)
+
+ _outputs += output
+ _availableOutputs += forwarder
+
+ output.startConsumer(object : FlowSource by forwarder {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ if (event == FlowEvent.Exit) {
+ // De-register the output after it has finished
+ _outputs -= output
+ }
+
+ forwarder.onEvent(conn, now, event)
+ }
+ })
+ }
+
+ override fun clear() {
+ for (input in _outputs) {
+ input.cancel()
+ }
+ _outputs.clear()
+
+ // Inputs are implicitly cancelled by the output forwarders
+ _inputs.clear()
+ }
+
+ /**
+ * An input on the multiplexer.
+ */
+ private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder {
+ /**
+ * Close the input.
+ */
+ fun close() {
+ // We explicitly do not close the forwarder here in order to re-use it across input resources.
+ _inputs -= this
+ _availableOutputs += forwarder
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
new file mode 100644
index 00000000..9735f121
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -0,0 +1,399 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceDomain
+import org.opendc.simulator.flow.interference.InterferenceKey
+import org.opendc.simulator.flow.internal.FlowCountersImpl
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing.
+ *
+ * @param engine The [FlowEngine] to drive the flow simulation.
+ * @param parent The parent flow system of the multiplexer.
+ * @param interferenceDomain The interference domain of the multiplexer.
+ */
+public class MaxMinFlowMultiplexer(
+ private val engine: FlowEngine,
+ private val parent: FlowSystem? = null,
+ private val interferenceDomain: InterferenceDomain? = null
+) : FlowMultiplexer {
+ /**
+ * The inputs of the multiplexer.
+ */
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+ private val _activeInputs = mutableListOf<Input>()
+
+ /**
+ * The outputs of the multiplexer.
+ */
+ override val outputs: Set<FlowConsumer>
+ get() = _outputs
+ private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _activeOutputs = mutableListOf<Output>()
+
+ /**
+ * The flow counters of this multiplexer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = FlowCountersImpl()
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ private var _rate = 0.0
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ private var _demand = 0.0
+
+ /**
+ * The capacity of the outputs.
+ */
+ private var _capacity = 0.0
+
+ /**
+ * Flag to indicate that the scheduler is active.
+ */
+ private var _schedulerActive = false
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val provider = Input(_capacity, key)
+ _inputs.add(provider)
+ return provider
+ }
+
+ override fun addOutput(output: FlowConsumer) {
+ val consumer = Output(output)
+ if (_outputs.add(output)) {
+ _activeOutputs.add(consumer)
+ output.startConsumer(consumer)
+ }
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+ // This cast should always succeed since only `Input` instances should be added to `_inputs`
+ (input as Input).close()
+ }
+
+ override fun clear() {
+ for (input in _activeOutputs) {
+ input.cancel()
+ }
+ _activeOutputs.clear()
+
+ for (output in _activeInputs) {
+ output.cancel()
+ }
+ _activeInputs.clear()
+ }
+
+ /**
+ * Converge the scheduler of the multiplexer.
+ */
+ private fun runScheduler(now: Long) {
+ if (_schedulerActive) {
+ return
+ }
+
+ _schedulerActive = true
+ try {
+ doSchedule(now)
+ } finally {
+ _schedulerActive = false
+ }
+ }
+
+ /**
+ * Schedule the inputs over the outputs.
+ */
+ private fun doSchedule(now: Long) {
+ val activeInputs = _activeInputs
+ val activeOutputs = _activeOutputs
+
+ // If there is no work yet, mark the inputs as idle.
+ if (activeInputs.isEmpty()) {
+ return
+ }
+
+ val capacity = _capacity
+ var availableCapacity = capacity
+
+ // Pull in the work of the outputs
+ val inputIterator = activeInputs.listIterator()
+ for (input in inputIterator) {
+ input.pull(now)
+
+ // Remove outputs that have finished
+ if (!input.isActive) {
+ inputIterator.remove()
+ }
+ }
+
+ var demand = 0.0
+
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeInputs.sort()
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ var remaining = activeInputs.size
+ for (input in activeInputs) {
+ val availableShare = availableCapacity / remaining--
+ val grantedRate = min(input.allowedRate, availableShare)
+
+ // Ignore empty sources
+ if (grantedRate <= 0.0) {
+ input.actualRate = 0.0
+ continue
+ }
+
+ input.actualRate = grantedRate
+ demand += input.limit
+ availableCapacity -= grantedRate
+ }
+
+ val rate = capacity - availableCapacity
+
+ _demand = demand
+ _rate = rate
+
+ // Sort all consumers by their capacity
+ activeOutputs.sort()
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (output in activeOutputs) {
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
+ }
+
+ /**
+ * Recompute the capacity of the multiplexer.
+ */
+ private fun updateCapacity() {
+ val newCapacity = _activeOutputs.sumOf(Output::capacity)
+
+ // No-op if the capacity is unchanged
+ if (_capacity == newCapacity) {
+ return
+ }
+
+ _capacity = newCapacity
+
+ for (input in _inputs) {
+ input.capacity = newCapacity
+ }
+ }
+
+ /**
+ * An internal [FlowConsumer] implementation for multiplexer inputs.
+ */
+ private inner class Input(capacity: Double, val key: InterferenceKey?) :
+ AbstractFlowConsumer(engine, capacity),
+ FlowConsumerLogic,
+ Comparable<Input> {
+ /**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing rate that is allowed by the model constraints.
+ */
+ val allowedRate: Double
+ get() = min(capacity, limit)
+
+ /**
+ * A flag to indicate that the input is closed.
+ */
+ private var _isClosed: Boolean = false
+
+ /**
+ * The timestamp at which we received the last command.
+ */
+ private var _lastPull: Long = Long.MIN_VALUE
+
+ /**
+ * Close the input.
+ *
+ * This method is invoked when the user removes an input from the switch.
+ */
+ fun close() {
+ _isClosed = true
+ cancel()
+ }
+
+ /* AbstractFlowConsumer */
+ override fun createLogic(): FlowConsumerLogic = this
+
+ override fun start(ctx: FlowConsumerContext) {
+ check(!_isClosed) { "Cannot re-use closed input" }
+
+ _activeInputs += this
+ super.start(ctx)
+ }
+
+ /* FlowConsumerLogic */
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ doUpdateCounters(delta)
+
+ actualRate = 0.0
+ this.limit = rate
+ _lastPull = now
+
+ runScheduler(now)
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ doUpdateCounters(delta)
+
+ limit = 0.0
+ actualRate = 0.0
+ _lastPull = now
+ }
+
+ /* Comparable */
+ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
+
+ /**
+ * Pull the source if necessary.
+ */
+ fun pull(now: Long) {
+ val ctx = ctx
+ if (ctx != null && _lastPull < now) {
+ ctx.flush()
+ }
+ }
+
+ /**
+ * Helper method to update the flow counters of the multiplexer.
+ */
+ private fun doUpdateCounters(delta: Long) {
+ if (delta <= 0L) {
+ return
+ }
+
+ // Compute the performance penalty due to flow interference
+ val perfScore = if (interferenceDomain != null) {
+ val load = _rate / capacity
+ interferenceDomain.apply(key, load)
+ } else {
+ 1.0
+ }
+
+ val deltaS = delta / 1000.0
+ val work = limit * deltaS
+ val actualWork = actualRate * deltaS
+ val remainingWork = work - actualWork
+
+ updateCounters(work, actualWork, remainingWork)
+
+ val distCounters = _counters
+ distCounters.demand += work
+ distCounters.actual += actualWork
+ distCounters.overcommit += remainingWork
+ distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ }
+ }
+
+ /**
+ * An internal [FlowSource] implementation for multiplexer outputs.
+ */
+ private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> {
+ /**
+ * The active [FlowConnection] of this source.
+ */
+ private var _ctx: FlowConnection? = null
+
+ /**
+ * The capacity of this output.
+ */
+ val capacity: Double
+ get() = _ctx?.capacity ?: 0.0
+
+ /**
+ * Push the specified rate to the consumer.
+ */
+ fun push(rate: Double) {
+ _ctx?.push(rate)
+ }
+
+ /**
+ * Cancel this output.
+ */
+ fun cancel() {
+ provider.cancel()
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ runScheduler(now)
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ when (event) {
+ FlowEvent.Start -> {
+ assert(_ctx == null) { "Source running concurrently" }
+ _ctx = conn
+ updateCapacity()
+ }
+ FlowEvent.Exit -> {
+ _ctx = null
+ updateCapacity()
+ }
+ FlowEvent.Capacity -> updateCapacity()
+ else -> {}
+ }
+ }
+
+ override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
index bf76711f..d9779c6a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
@@ -20,41 +20,37 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
import kotlin.math.roundToLong
/**
- * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization.
+ * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization].
*/
-public class SimWorkConsumer(
- private val work: Double,
- private val utilization: Double
-) : SimResourceConsumer {
+public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource {
init {
- require(work >= 0.0) { "Work must be positive" }
+ require(amount >= 0.0) { "Amount must be positive" }
require(utilization > 0.0) { "Utilization must be positive" }
}
- private var remainingWork = work
+ private var remainingAmount = amount
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val actualWork = ctx.speed * delta / 1000.0
- val limit = ctx.capacity * utilization
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val consumed = conn.rate * delta / 1000.0
+ val limit = conn.capacity * utilization
- remainingWork -= actualWork
+ remainingAmount -= consumed
- val remainingWork = remainingWork
- val duration = (remainingWork / limit * 1000).roundToLong()
+ val duration = (remainingAmount / limit * 1000).roundToLong()
return if (duration > 0) {
- ctx.push(limit)
+ conn.push(limit)
duration
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
index 52a42241..b3191ad3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimConsumerBarrier.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
@@ -20,13 +20,13 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
/**
- * The [SimConsumerBarrier] is a barrier that allows consumers to wait for a select number of other consumers to
- * complete, before proceeding its operation.
+ * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to
+ * finish a pull, before proceeding its operation.
*/
-public class SimConsumerBarrier(public val parties: Int) {
+public class FlowSourceBarrier(public val parties: Int) {
private var counter = 0
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 46885640..7fcc0405 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -20,20 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
import kotlin.math.min
/**
* Helper class to expose an observable [speed] field describing the speed of the consumer.
*/
-public class SimSpeedConsumerAdapter(
- private val delegate: SimResourceConsumer,
+public class FlowSourceRateAdapter(
+ private val delegate: FlowSource,
private val callback: (Double) -> Unit = {}
-) : SimResourceConsumer by delegate {
+) : FlowSource by delegate {
/**
* The resource processing speed at this instant.
*/
@@ -49,34 +49,34 @@ public class SimSpeedConsumerAdapter(
callback(0.0)
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return delegate.onNext(ctx, now, delta)
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
val oldSpeed = speed
- delegate.onEvent(ctx, event)
+ delegate.onEvent(conn, now, event)
when (event) {
- SimResourceEvent.Run -> speed = ctx.speed
- SimResourceEvent.Capacity -> {
+ FlowEvent.Converge -> speed = conn.rate
+ FlowEvent.Capacity -> {
// Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
// need to update the current speed.
if (oldSpeed == speed) {
- speed = min(ctx.capacity, speed)
+ speed = min(conn.capacity, speed)
}
}
- SimResourceEvent.Exit -> speed = 0.0
+ FlowEvent.Exit -> speed = 0.0
else -> {}
}
}
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
speed = 0.0
- delegate.onFailure(ctx, cause)
+ delegate.onFailure(conn, cause)
}
- override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
+ override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
index 4c0e075c..4d3ae61a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -20,21 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources.consumer
+package org.opendc.simulator.flow.source
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceContext
-import org.opendc.simulator.resources.SimResourceEvent
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
/**
- * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
- * consumption for some period of time.
+ * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time.
*/
-public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
+public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource {
private var _iterator: Iterator<Fragment>? = null
private var _nextTarget = Long.MIN_VALUE
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
// Check whether the trace fragment was fully consumed, otherwise wait until we have done so
val nextTarget = _nextTarget
if (nextTarget > now) {
@@ -45,21 +44,21 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
return if (iterator.hasNext()) {
val fragment = iterator.next()
_nextTarget = now + fragment.duration
- ctx.push(fragment.usage)
+ conn.push(fragment.usage)
fragment.duration
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> {
- check(_iterator == null) { "Consumer already running" }
+ FlowEvent.Start -> {
+ check(_iterator == null) { "Source already running" }
_iterator = trace.iterator()
}
- SimResourceEvent.Exit -> {
+ FlowEvent.Exit -> {
_iterator = null
}
else -> {}
@@ -67,7 +66,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
}
/**
- * A fragment of the workload.
+ * A fragment of the tgrace.
*/
public data class Fragment(val duration: Long, val usage: Double)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
new file mode 100644
index 00000000..061ebea6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+
+/**
+ * A test suite for the [FlowConsumerContextImpl] class.
+ */
+class FlowConsumerContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ engine.scheduleSync(engine.clock.millis(), context)
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = FixedFlowSource(1.0, 1.0)
+
+ val logic = spyk(object : FlowConsumerLogic {})
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 1.0
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ engine.scheduleSync(engine.clock.millis(), context)
+
+ verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) }
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(0.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ assertThrows<IllegalStateException> {
+ context.start()
+ }
+ }
+
+ @Test
+ fun testIdempotentCapacityChange() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 4200.0
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ }
+
+ @Test
+ fun testFailureNoInfiniteLoop() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ if (event == FlowEvent.Exit) throw IllegalStateException("onEvent")
+ }
+
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
+ throw IllegalStateException("onFailure")
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ delay(1)
+
+ verify(exactly = 1) { consumer.onFailure(any(), any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index 49e60f68..cbc48a4e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import io.mockk.spyk
import io.mockk.verify
@@ -29,24 +29,24 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
- * A test suite for the [SimResourceForwarder] class.
+ * A test suite for the [FlowForwarder] class.
*/
-internal class SimResourceForwarderTest {
+internal class FlowForwarderTest {
@Test
fun testCancelImmediately() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
launch { source.consume(forwarder) }
- forwarder.consume(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
})
@@ -57,22 +57,22 @@ internal class SimResourceForwarderTest {
@Test
fun testCancel() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
launch { source.consume(forwarder) }
- forwarder.consume(object : SimResourceConsumer {
+ forwarder.consume(object : FlowSource {
var isFirst = true
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
10 * 1000
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
@@ -84,10 +84,11 @@ internal class SimResourceForwarderTest {
@Test
fun testState() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
}
@@ -108,11 +109,12 @@ internal class SimResourceForwarderTest {
@Test
fun testCancelPendingDelegate() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
- val consumer = spyk(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
})
@@ -120,16 +122,16 @@ internal class SimResourceForwarderTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testCancelStartedDelegate() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
- val consumer = spyk(SimWorkConsumer(2000.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
source.startConsumer(forwarder)
yield()
@@ -137,17 +139,17 @@ internal class SimResourceForwarderTest {
yield()
forwarder.cancel()
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testCancelPropagation() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
- val consumer = spyk(SimWorkConsumer(2000.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
source.startConsumer(forwarder)
yield()
@@ -155,19 +157,19 @@ internal class SimResourceForwarderTest {
yield()
source.cancel()
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testExitPropagation() = runBlockingSimulation {
- val forwarder = SimResourceForwarder(isCoupled = true)
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(2000.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
}
@@ -181,11 +183,11 @@ internal class SimResourceForwarderTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(1.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
- val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
source.startConsumer(forwarder)
coroutineScope {
@@ -195,16 +197,16 @@ internal class SimResourceForwarderTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
}
@Test
fun testCounters() = runBlockingSimulation {
- val forwarder = SimResourceForwarder()
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val source = SimResourceSource(1.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
- val consumer = SimWorkConsumer(2.0, 1.0)
+ val consumer = FixedFlowSource(2.0, 1.0)
source.startConsumer(forwarder)
forwarder.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
index e055daf7..010a985e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow
import io.mockk.every
import io.mockk.mockk
@@ -30,24 +30,24 @@ import kotlinx.coroutines.*
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
/**
- * A test suite for the [SimResourceSource] class.
+ * A test suite for the [FlowSink] class.
*/
-internal class SimResourceSourceTest {
+internal class FlowSinkTest {
@Test
fun testSpeed() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(4200.0, 1.0)
+ val consumer = FixedFlowSource(4200.0, 1.0)
val res = mutableListOf<Double>()
- val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
provider.consume(adapter)
@@ -56,10 +56,10 @@ internal class SimResourceSourceTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val provider = SimResourceSource(1.0, scheduler)
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(engine, 1.0)
- val consumer = spyk(SimWorkConsumer(2.0, 1.0))
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
coroutineScope {
launch { provider.consume(consumer) }
@@ -67,19 +67,19 @@ internal class SimResourceSourceTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
}
@Test
fun testSpeedLimit() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(capacity, 2.0)
+ val consumer = FixedFlowSource(capacity, 2.0)
val res = mutableListOf<Double>()
- val adapter = SimSpeedConsumerAdapter(consumer, res::add)
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
provider.consume(adapter)
@@ -87,23 +87,23 @@ internal class SimResourceSourceTest {
}
/**
- * Test to see whether no infinite recursion occurs when interrupting during [SimResourceConsumer.onStart] or
- * [SimResourceConsumer.onNext].
+ * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
+ * [FlowSource.onPull].
*/
@Test
fun testIntermediateInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
return Long.MAX_VALUE
}
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- ctx.interrupt()
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ conn.pull()
}
}
@@ -112,28 +112,28 @@ internal class SimResourceSourceTest {
@Test
fun testInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
- lateinit var resCtx: SimResourceContext
+ val provider = FlowSink(engine, capacity)
+ lateinit var resCtx: FlowConnection
- val consumer = object : SimResourceConsumer {
+ val consumer = object : FlowSource {
var isFirst = true
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> resCtx = ctx
+ FlowEvent.Start -> resCtx = conn
else -> {}
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
4000
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
@@ -141,7 +141,7 @@ internal class SimResourceSourceTest {
launch {
yield()
- resCtx.interrupt()
+ resCtx.pull()
}
provider.consume(consumer)
@@ -150,12 +150,12 @@ internal class SimResourceSourceTest {
@Test
fun testFailure() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) }
+ val consumer = mockk<FlowSource>(relaxUnitFun = true)
+ every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) }
.throws(IllegalStateException())
assertThrows<IllegalStateException> {
@@ -165,17 +165,17 @@ internal class SimResourceSourceTest {
@Test
fun testExceptionPropagationOnNext() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = object : SimResourceConsumer {
+ val consumer = object : FlowSource {
var isFirst = true
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
1000
} else {
throw IllegalStateException()
@@ -190,11 +190,11 @@ internal class SimResourceSourceTest {
@Test
fun testConcurrentConsumption() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(capacity, 1.0)
+ val consumer = FixedFlowSource(capacity, 1.0)
assertThrows<IllegalStateException> {
coroutineScope {
@@ -206,11 +206,11 @@ internal class SimResourceSourceTest {
@Test
fun testCancelDuringConsumption() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = SimWorkConsumer(capacity, 1.0)
+ val consumer = FixedFlowSource(capacity, 1.0)
launch { provider.consume(consumer) }
delay(500)
@@ -225,12 +225,12 @@ internal class SimResourceSourceTest {
fun testInfiniteSleep() {
assertThrows<IllegalStateException> {
runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, scheduler)
+ val provider = FlowSink(engine, capacity)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
}
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt
index 49f2da5f..b503087e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.mux
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
@@ -28,13 +28,14 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+import org.opendc.simulator.flow.source.TraceFlowSource
/**
- * Test suite for the [SimResourceSwitchExclusive] class.
+ * Test suite for the [ForwardingFlowMultiplexer] class.
*/
internal class SimResourceSwitchExclusiveTest {
/**
@@ -42,29 +43,29 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val speed = mutableListOf<Double>()
val duration = 5 * 60L
val workload =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3500.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
),
)
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
- val forwarder = SimResourceForwarder()
- val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+ val forwarder = FlowForwarder(engine)
+ val adapter = FlowSourceRateAdapter(forwarder, speed::add)
source.startConsumer(adapter)
- switch.addInput(forwarder)
+ switch.addOutput(forwarder)
- val provider = switch.newOutput()
+ val provider = switch.newInput()
provider.consume(workload)
yield()
@@ -79,17 +80,17 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testRuntimeWorkload() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = SimWorkConsumer(duration * 3.2, 1.0)
+ val workload = FixedFlowSource(duration * 3.2, 1.0)
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
- switch.addInput(source)
+ switch.addOutput(source)
- val provider = switch.newOutput()
+ val provider = switch.newInput()
provider.consume(workload)
yield()
@@ -101,37 +102,37 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTwoWorkloads() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
- val workload = object : SimResourceConsumer {
+ val workload = object : FlowSource {
var isFirst = true
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- SimResourceEvent.Start -> isFirst = true
+ FlowEvent.Start -> isFirst = true
else -> {}
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
return if (isFirst) {
isFirst = false
- ctx.push(1.0)
+ conn.push(1.0)
duration
} else {
- ctx.close()
+ conn.close()
Long.MAX_VALUE
}
}
}
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
- switch.addInput(source)
+ switch.addOutput(source)
- val provider = switch.newOutput()
+ val provider = switch.newInput()
provider.consume(workload)
yield()
provider.consume(workload)
@@ -143,14 +144,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val engine = FlowEngineImpl(coroutineContext, clock)
- val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, scheduler)
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
- switch.addInput(source)
+ switch.addOutput(source)
- switch.newOutput()
- assertThrows<IllegalStateException> { switch.newOutput() }
+ switch.newInput()
+ assertThrows<IllegalStateException> { switch.newInput() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt
index 03f90e21..089a8d78 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.mux
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -28,24 +28,26 @@ import kotlinx.coroutines.yield
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.TraceFlowSource
/**
- * Test suite for the [SimResourceSwitch] implementations
+ * Test suite for the [FlowMultiplexer] implementations
*/
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin(scheduler)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val switch = MaxMinFlowMultiplexer(scheduler)
- val sources = List(2) { SimResourceSource(2000.0, scheduler) }
- sources.forEach { switch.addInput(it) }
+ val sources = List(2) { FlowSink(scheduler, 2000.0) }
+ sources.forEach { switch.addOutput(it) }
- val provider = switch.newOutput()
- val consumer = SimWorkConsumer(2000.0, 1.0)
+ val provider = switch.newInput()
+ val consumer = FixedFlowSource(2000.0, 1.0)
try {
provider.consume(consumer)
@@ -60,24 +62,24 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L
val workload =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3500.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
),
)
- val switch = SimResourceSwitchMaxMin(scheduler)
- val provider = switch.newOutput()
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val provider = switch.newInput()
try {
- switch.addInput(SimResourceSource(3200.0, scheduler))
+ switch.addOutput(FlowSink(scheduler, 3200.0))
provider.consume(workload)
yield()
} finally {
@@ -97,34 +99,34 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
val duration = 5 * 60L
val workloadA =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3500.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 183.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
),
)
val workloadB =
- SimTraceConsumer(
+ TraceFlowSource(
sequenceOf(
- SimTraceConsumer.Fragment(duration * 1000, 28.0),
- SimTraceConsumer.Fragment(duration * 1000, 3100.0),
- SimTraceConsumer.Fragment(duration * 1000, 0.0),
- SimTraceConsumer.Fragment(duration * 1000, 73.0)
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3100.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 73.0)
)
)
- val switch = SimResourceSwitchMaxMin(scheduler)
- val providerA = switch.newOutput()
- val providerB = switch.newOutput()
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val providerA = switch.newInput()
+ val providerB = switch.newInput()
try {
- switch.addInput(SimResourceSource(3200.0, scheduler))
+ switch.addOutput(FlowSink(scheduler, 3200.0))
coroutineScope {
launch { providerA.consume(workloadA) }
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
index 830f16d3..8396d346 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
@@ -20,24 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.resources
+package org.opendc.simulator.flow.source
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
/**
- * A test suite for the [SimWorkConsumer] class.
+ * A test suite for the [FixedFlowSource] class.
*/
-internal class SimWorkConsumerTest {
+internal class FixedFlowSourceTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val provider = SimResourceSource(1.0, scheduler)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
- val consumer = SimWorkConsumer(1.0, 1.0)
+ val consumer = FixedFlowSource(1.0, 1.0)
provider.consume(consumer)
assertEquals(1000, clock.millis())
@@ -45,10 +46,10 @@ internal class SimWorkConsumerTest {
@Test
fun testUtilization() = runBlockingSimulation {
- val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val provider = SimResourceSource(1.0, scheduler)
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
- val consumer = SimWorkConsumer(1.0, 0.5)
+ val consumer = FixedFlowSource(1.0, 0.5)
provider.consume(consumer)
assertEquals(2000, clock.millis())
diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts
index eb9adcd1..a8f94602 100644
--- a/opendc-simulator/opendc-simulator-network/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts
@@ -30,6 +30,6 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
- api(projects.opendcSimulator.opendcSimulatorResources)
+ api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt
index 102e5625..4b66d5cf 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.network
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceProvider
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowSource
/**
* A network port allows network devices to be connected to network through links.
@@ -78,14 +78,14 @@ public abstract class SimNetworkPort {
}
/**
- * Create a [SimResourceConsumer] which generates the outgoing traffic of this port.
+ * Create a [FlowSource] which generates the outgoing traffic of this port.
*/
- protected abstract fun createConsumer(): SimResourceConsumer
+ protected abstract fun createConsumer(): FlowSource
/**
- * The [SimResourceProvider] which processes the ingoing traffic of this port.
+ * The [FlowConsumer] which processes the ingoing traffic of this port.
*/
- protected abstract val provider: SimResourceProvider
+ protected abstract val provider: FlowConsumer
override fun toString(): String = "SimNetworkPort[isConnected=$isConnected]"
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
index 7db0f176..4b0d7bbd 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt
@@ -22,22 +22,22 @@
package org.opendc.simulator.network
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
/**
* A network sink which discards all received traffic and does not generate any traffic itself.
*/
public class SimNetworkSink(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
public val capacity: Double
) : SimNetworkPort() {
- override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long = Long.MAX_VALUE
+ override fun createConsumer(): FlowSource = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
override fun toString(): String = "SimNetworkSink.Consumer"
}
- override val provider: SimResourceProvider = SimResourceSource(capacity, interpreter)
+ override val provider: FlowConsumer = FlowSink(engine, capacity)
override fun toString(): String = "SimNetworkSink[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
index 2267715e..2b7c1ad7 100644
--- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
+++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt
@@ -22,12 +22,13 @@
package org.opendc.simulator.network
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A [SimNetworkSwitch] that can support new networking ports on demand.
*/
-public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimNetworkSwitch {
+public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetworkSwitch {
/**
* The ports of this switch.
*/
@@ -36,9 +37,9 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN
private val _ports = mutableListOf<Port>()
/**
- * The [SimResourceSwitchMaxMin] to actually perform the switching.
+ * The [MaxMinFlowMultiplexer] to actually perform the switching.
*/
- private val switch = SimResourceSwitchMaxMin(interpreter)
+ private val mux = MaxMinFlowMultiplexer(engine)
/**
* Open a new port on the switch.
@@ -58,19 +59,19 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN
*/
private var isClosed: Boolean = false
- override val provider: SimResourceProvider
+ override val provider: FlowConsumer
get() = _provider
- private val _provider = switch.newOutput()
+ private val _provider = mux.newInput()
- override fun createConsumer(): SimResourceConsumer {
- val forwarder = SimResourceForwarder(isCoupled = true)
- switch.addInput(forwarder)
+ override fun createConsumer(): FlowSource {
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ mux.addOutput(forwarder)
return forwarder
}
override fun close() {
isClosed = true
- switch.removeOutput(_provider)
+ mux.removeInput(_provider)
_ports.remove(this)
}
}
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index b8c4b00d..45d0bcf0 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -31,8 +31,8 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimNetworkSink] class.
@@ -40,8 +40,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
class SimNetworkSinkTest {
@Test
fun testInitialState() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
assertFalse(sink.isConnected)
assertNull(sink.link)
@@ -50,8 +50,8 @@ class SimNetworkSinkTest {
@Test
fun testDisconnectIdempotent() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
assertDoesNotThrow { sink.disconnect() }
assertFalse(sink.isConnected)
@@ -59,8 +59,8 @@ class SimNetworkSinkTest {
@Test
fun testConnectCircular() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
assertThrows<IllegalArgumentException> {
sink.connect(sink)
@@ -69,8 +69,8 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnectedTarget() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
val source = mockk<SimNetworkPort>(relaxUnitFun = true)
every { source.isConnected } returns true
@@ -81,9 +81,9 @@ class SimNetworkSinkTest {
@Test
fun testConnectAlreadyConnected() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source1 = Source(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source1 = Source(engine)
val source2 = mockk<SimNetworkPort>(relaxUnitFun = true)
@@ -97,9 +97,9 @@ class SimNetworkSinkTest {
@Test
fun testConnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source = spyk(Source(interpreter))
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source = spyk(Source(engine))
val consumer = source.consumer
sink.connect(source)
@@ -108,14 +108,14 @@ class SimNetworkSinkTest {
assertTrue(source.isConnected)
verify { source.createConsumer() }
- verify { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Start) }
}
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source = spyk(Source(interpreter))
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source = spyk(Source(engine))
val consumer = source.consumer
sink.connect(source)
@@ -124,14 +124,14 @@ class SimNetworkSinkTest {
assertFalse(sink.isConnected)
assertFalse(source.isConnected)
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
- private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() {
- val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8))
+ private class Source(engine: FlowEngine) : SimNetworkPort() {
+ val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8))
- public override fun createConsumer(): SimResourceConsumer = consumer
+ public override fun createConsumer(): FlowSource = consumer
- override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter)
+ override val provider: FlowConsumer = FlowSink(engine, 0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
index 3a749bfe..4aa2fa92 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
@@ -28,8 +28,8 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.*
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimNetworkSwitchVirtual] class.
@@ -37,10 +37,10 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
class SimNetworkSwitchVirtualTest {
@Test
fun testConnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val source = spyk(Source(interpreter))
- val switch = SimNetworkSwitchVirtual(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val source = spyk(Source(engine))
+ val switch = SimNetworkSwitchVirtual(engine)
val consumer = source.consumer
switch.newPort().connect(sink)
@@ -50,14 +50,14 @@ class SimNetworkSwitchVirtualTest {
assertTrue(source.isConnected)
verify { source.createConsumer() }
- verify { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Start) }
}
@Test
fun testConnectClosedPort() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val sink = SimNetworkSink(interpreter, capacity = 100.0)
- val switch = SimNetworkSwitchVirtual(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val sink = SimNetworkSink(engine, capacity = 100.0)
+ val switch = SimNetworkSwitchVirtual(engine)
val port = switch.newPort()
port.close()
@@ -67,11 +67,11 @@ class SimNetworkSwitchVirtualTest {
}
}
- private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() {
- val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8))
+ private class Source(engine: FlowEngine) : SimNetworkPort() {
+ val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8))
- public override fun createConsumer(): SimResourceConsumer = consumer
+ public override fun createConsumer(): FlowSource = consumer
- override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter)
+ override val provider: FlowConsumer = FlowSink(engine, 0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts
index f2a49964..e4342a6a 100644
--- a/opendc-simulator/opendc-simulator-power/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts
@@ -30,6 +30,6 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
- api(projects.opendcSimulator.opendcSimulatorResources)
+ api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
index 1a12a52a..c33f5186 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPdu.kt
@@ -22,46 +22,48 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.FlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A model of a Power Distribution Unit (PDU).
*
- * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood.
+ * @param engine The underlying [FlowEngine] to drive the simulation under the hood.
* @param idlePower The idle power consumption of the PDU independent of the load on the PDU.
* @param lossCoefficient The coefficient for the power loss of the PDU proportional to the square load.
*/
public class SimPdu(
- interpreter: SimResourceInterpreter,
+ engine: FlowEngine,
private val idlePower: Double = 0.0,
private val lossCoefficient: Double = 0.0,
) : SimPowerInlet() {
/**
- * The [SimResourceSwitch] that distributes the electricity over the PDU outlets.
+ * The [FlowMultiplexer] that distributes the electricity over the PDU outlets.
*/
- private val switch = SimResourceSwitchMaxMin(interpreter)
+ private val mux = MaxMinFlowMultiplexer(engine)
/**
- * The [SimResourceForwarder] that represents the input of the PDU.
+ * The [FlowForwarder] that represents the input of the PDU.
*/
- private val forwarder = SimResourceForwarder()
+ private val forwarder = FlowForwarder(engine)
/**
* Create a new PDU outlet.
*/
- public fun newOutlet(): Outlet = Outlet(switch, switch.newOutput())
+ public fun newOutlet(): Outlet = Outlet(mux, mux.newInput())
init {
- switch.addInput(forwarder)
+ mux.addOutput(forwarder)
}
- override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer by forwarder {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val duration = forwarder.onNext(ctx, now, delta)
- val loss = computePowerLoss(ctx.demand)
- val newLimit = ctx.demand + loss
+ override fun createConsumer(): FlowSource = object : FlowSource by forwarder {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val duration = forwarder.onPull(conn, now, delta)
+ val loss = computePowerLoss(conn.demand)
+ val newLimit = conn.demand + loss
- ctx.push(newLimit)
+ conn.push(newLimit)
return duration
}
@@ -81,7 +83,7 @@ public class SimPdu(
/**
* A PDU outlet.
*/
- public class Outlet(private val switch: SimResourceSwitch, private val provider: SimResourceProvider) : SimPowerOutlet(), AutoCloseable {
+ public class Outlet(private val switch: FlowMultiplexer, private val provider: FlowConsumer) : SimPowerOutlet(), AutoCloseable {
override fun onConnect(inlet: SimPowerInlet) {
provider.startConsumer(inlet.createConsumer())
}
@@ -94,7 +96,7 @@ public class SimPdu(
* Remove the outlet from the PDU.
*/
override fun close() {
- switch.removeOutput(provider)
+ switch.removeInput(provider)
}
override fun toString(): String = "SimPdu.Outlet"
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt
index 0ac1f199..851b28a5 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerInlet.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.flow.FlowSource
/**
* An abstract inlet that consumes electricity from a power outlet.
@@ -42,7 +42,7 @@ public abstract class SimPowerInlet {
internal var _outlet: SimPowerOutlet? = null
/**
- * Create a [SimResourceConsumer] which represents the consumption of electricity from the power outlet.
+ * Create a [FlowSource] which represents the consumption of electricity from the power outlet.
*/
- public abstract fun createConsumer(): SimResourceConsumer
+ public abstract fun createConsumer(): FlowSource
}
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt
index 3ef8ccc6..7faebd75 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimPowerSource.kt
@@ -22,25 +22,25 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.SimResourceSource
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowSink
/**
* A [SimPowerOutlet] that represents a source of electricity.
*
- * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood.
+ * @param engine The underlying [FlowEngine] to drive the simulation under the hood.
*/
-public class SimPowerSource(interpreter: SimResourceInterpreter, public val capacity: Double) : SimPowerOutlet() {
+public class SimPowerSource(engine: FlowEngine, public val capacity: Double) : SimPowerOutlet() {
/**
* The resource source that drives this power source.
*/
- private val source = SimResourceSource(capacity, interpreter)
+ private val source = FlowSink(engine, capacity)
/**
* The power draw at this instant.
*/
public val powerDraw: Double
- get() = source.speed
+ get() = source.rate
override fun onConnect(inlet: SimPowerInlet) {
source.startConsumer(inlet.createConsumer())
diff --git a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
index 9c7400ed..5eaa91af 100644
--- a/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
+++ b/opendc-simulator/opendc-simulator-power/src/main/kotlin/org/opendc/simulator/power/SimUps.kt
@@ -22,50 +22,51 @@
package org.opendc.simulator.power
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
/**
* A model of an Uninterruptible Power Supply (UPS).
*
* This model aggregates multiple power sources into a single source in order to ensure that power is always available.
*
- * @param interpreter The underlying [SimResourceInterpreter] to drive the simulation under the hood.
+ * @param engine The underlying [FlowEngine] to drive the simulation under the hood.
* @param idlePower The idle power consumption of the UPS independent of the load.
* @param lossCoefficient The coefficient for the power loss of the UPS proportional to the load.
*/
public class SimUps(
- interpreter: SimResourceInterpreter,
+ private val engine: FlowEngine,
private val idlePower: Double = 0.0,
private val lossCoefficient: Double = 0.0,
) : SimPowerOutlet() {
/**
* The resource aggregator used to combine the input sources.
*/
- private val switch = SimResourceSwitchMaxMin(interpreter)
+ private val switch = MaxMinFlowMultiplexer(engine)
/**
- * The [SimResourceProvider] that represents the output of the UPS.
+ * The [FlowConsumer] that represents the output of the UPS.
*/
- private val provider = switch.newOutput()
+ private val provider = switch.newInput()
/**
* Create a new UPS outlet.
*/
public fun newInlet(): SimPowerInlet {
- val forward = SimResourceForwarder(isCoupled = true)
- switch.addInput(forward)
+ val forward = FlowForwarder(engine, isCoupled = true)
+ switch.addOutput(forward)
return Inlet(forward)
}
override fun onConnect(inlet: SimPowerInlet) {
val consumer = inlet.createConsumer()
- provider.startConsumer(object : SimResourceConsumer by consumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val duration = consumer.onNext(ctx, now, delta)
- val loss = computePowerLoss(ctx.demand)
- val newLimit = ctx.demand + loss
+ provider.startConsumer(object : FlowSource by consumer {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val duration = consumer.onPull(conn, now, delta)
+ val loss = computePowerLoss(conn.demand)
+ val newLimit = conn.demand + loss
- ctx.push(newLimit)
+ conn.push(newLimit)
return duration
}
})
@@ -86,8 +87,8 @@ public class SimUps(
/**
* A UPS inlet.
*/
- public inner class Inlet(private val forwarder: SimResourceForwarder) : SimPowerInlet(), AutoCloseable {
- override fun createConsumer(): SimResourceConsumer = forwarder
+ public inner class Inlet(private val forwarder: FlowForwarder) : SimPowerInlet(), AutoCloseable {
+ override fun createConsumer(): FlowSource = forwarder
/**
* Remove the inlet from the PSU.
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index 17a174b7..568a1e8c 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceEvent
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimPdu] class.
@@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimPduTest {
@Test
fun testZeroOutlets() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
assertEquals(0.0, source.powerDraw)
@@ -49,9 +49,9 @@ internal class SimPduTest {
@Test
fun testSingleOutlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
pdu.newOutlet().connect(SimpleInlet())
@@ -60,9 +60,9 @@ internal class SimPduTest {
@Test
fun testDoubleOutlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
pdu.newOutlet().connect(SimpleInlet())
@@ -73,28 +73,28 @@ internal class SimPduTest {
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
- val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0))
+ val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0))
val inlet = object : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = consumer
+ override fun createConsumer(): FlowSource = consumer
}
val outlet = pdu.newOutlet()
outlet.connect(inlet)
outlet.disconnect()
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testLoss() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
- val pdu = SimPdu(interpreter, idlePower = 1.5, lossCoefficient = 0.015)
+ val pdu = SimPdu(engine, idlePower = 1.5, lossCoefficient = 0.015)
source.connect(pdu)
pdu.newOutlet().connect(SimpleInlet())
assertEquals(89.0, source.powerDraw, 0.01)
@@ -102,9 +102,9 @@ internal class SimPduTest {
@Test
fun testOutletClose() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val pdu = SimPdu(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val pdu = SimPdu(engine)
source.connect(pdu)
val outlet = pdu.newOutlet()
outlet.close()
@@ -115,6 +115,6 @@ internal class SimPduTest {
}
class SimpleInlet : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5)
+ override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5)
}
}
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index f3829ba1..b411e292 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -31,10 +31,10 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceEvent
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimPowerSource]
@@ -42,8 +42,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimPowerSourceTest {
@Test
fun testInitialState() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
assertFalse(source.isConnected)
assertNull(source.inlet)
@@ -52,8 +52,8 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnectIdempotent() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
assertDoesNotThrow { source.disconnect() }
assertFalse(source.isConnected)
@@ -61,8 +61,8 @@ internal class SimPowerSourceTest {
@Test
fun testConnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = SimpleInlet()
source.connect(inlet)
@@ -76,27 +76,27 @@ internal class SimPowerSourceTest {
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0))
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0))
val inlet = object : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = consumer
+ override fun createConsumer(): FlowSource = consumer
}
source.connect(inlet)
source.disconnect()
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
@Test
fun testDisconnectAssertion() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = mockk<SimPowerInlet>(relaxUnitFun = true)
every { inlet.isConnected } returns false
every { inlet._outlet } returns null
- every { inlet.createConsumer() } returns SimWorkConsumer(100.0, utilization = 1.0)
+ every { inlet.createConsumer() } returns FixedFlowSource(100.0, utilization = 1.0)
source.connect(inlet)
@@ -107,8 +107,8 @@ internal class SimPowerSourceTest {
@Test
fun testOutletAlreadyConnected() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = SimpleInlet()
source.connect(inlet)
@@ -121,8 +121,8 @@ internal class SimPowerSourceTest {
@Test
fun testInletAlreadyConnected() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
val inlet = mockk<SimPowerInlet>(relaxUnitFun = true)
every { inlet.isConnected } returns true
@@ -132,6 +132,6 @@ internal class SimPowerSourceTest {
}
class SimpleInlet : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 1.0)
+ override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 1.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index 8d5fa857..31ac0b39 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -28,10 +28,10 @@ import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceConsumer
-import org.opendc.simulator.resources.SimResourceEvent
-import org.opendc.simulator.resources.SimResourceInterpreter
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.flow.FlowEngine
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.source.FixedFlowSource
/**
* Test suite for the [SimUps] class.
@@ -39,9 +39,9 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimUpsTest {
@Test
fun testSingleInlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
- val ups = SimUps(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
+ val ups = SimUps(engine)
source.connect(ups.newInlet())
ups.connect(SimpleInlet())
@@ -50,10 +50,10 @@ internal class SimUpsTest {
@Test
fun testDoubleInlet() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source1 = SimPowerSource(interpreter, capacity = 100.0)
- val source2 = SimPowerSource(interpreter, capacity = 100.0)
- val ups = SimUps(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source1 = SimPowerSource(engine, capacity = 100.0)
+ val source2 = SimPowerSource(engine, capacity = 100.0)
+ val ups = SimUps(engine)
source1.connect(ups.newInlet())
source2.connect(ups.newInlet())
@@ -67,10 +67,10 @@ internal class SimUpsTest {
@Test
fun testLoss() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source = SimPowerSource(interpreter, capacity = 100.0)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source = SimPowerSource(engine, capacity = 100.0)
// https://download.schneider-electric.com/files?p_Doc_Ref=SPD_NRAN-66CK3D_EN
- val ups = SimUps(interpreter, idlePower = 4.0, lossCoefficient = 0.05)
+ val ups = SimUps(engine, idlePower = 4.0, lossCoefficient = 0.05)
source.connect(ups.newInlet())
ups.connect(SimpleInlet())
@@ -79,24 +79,24 @@ internal class SimUpsTest {
@Test
fun testDisconnect() = runBlockingSimulation {
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val source1 = SimPowerSource(interpreter, capacity = 100.0)
- val source2 = SimPowerSource(interpreter, capacity = 100.0)
- val ups = SimUps(interpreter)
+ val engine = FlowEngine(coroutineContext, clock)
+ val source1 = SimPowerSource(engine, capacity = 100.0)
+ val source2 = SimPowerSource(engine, capacity = 100.0)
+ val ups = SimUps(engine)
source1.connect(ups.newInlet())
source2.connect(ups.newInlet())
- val consumer = spyk(SimWorkConsumer(100.0, utilization = 1.0))
+ val consumer = spyk(FixedFlowSource(100.0, utilization = 1.0))
val inlet = object : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = consumer
+ override fun createConsumer(): FlowSource = consumer
}
ups.connect(inlet)
ups.disconnect()
- verify { consumer.onEvent(any(), SimResourceEvent.Exit) }
+ verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
}
class SimpleInlet : SimPowerInlet() {
- override fun createConsumer(): SimResourceConsumer = SimWorkConsumer(100.0, utilization = 0.5)
+ override fun createConsumer(): FlowSource = FixedFlowSource(100.0, utilization = 0.5)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
deleted file mode 100644
index b406b896..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * A controllable [SimResourceContext].
- *
- * This interface is used by resource providers to control the resource context.
- */
-public interface SimResourceControllableContext : SimResourceContext {
- /**
- * The capacity of the resource.
- */
- public override var capacity: Double
-
- /**
- * Start the resource context.
- */
- public fun start()
-
- /**
- * Invalidate the resource context's state.
- *
- * By invalidating the resource context's current state, the state is re-computed and the current progress is
- * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the
- * synchronous variant.
- */
- public fun invalidate()
-
- /**
- * Synchronously flush the progress of the resource context and materialize its current progress.
- */
- public fun flush()
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
deleted file mode 100644
index f1e004d2..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.simulator.resources.interference.InterferenceKey
-import java.util.ArrayDeque
-
-/**
- * A [SimResourceSwitch] implementation that allocates outputs to the inputs of the switch exclusively. This means that
- * a single output is directly connected to an input and that the switch can only support as many outputs as inputs.
- */
-public class SimResourceSwitchExclusive : SimResourceSwitch {
- override val outputs: Set<SimResourceProvider>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
-
- private val _inputs = mutableSetOf<SimResourceProvider>()
- override val inputs: Set<SimResourceProvider>
- get() = _inputs
- private val _availableInputs = ArrayDeque<SimResourceForwarder>()
-
- override val counters: SimResourceCounters = object : SimResourceCounters {
- override val demand: Double
- get() = _inputs.sumOf { it.counters.demand }
- override val actual: Double
- get() = _inputs.sumOf { it.counters.actual }
- override val overcommit: Double
- get() = _inputs.sumOf { it.counters.overcommit }
- override val interference: Double
- get() = _inputs.sumOf { it.counters.interference }
-
- override fun reset() {
- for (input in _inputs) {
- input.counters.reset()
- }
- }
-
- override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
- }
-
- /**
- * Add an output to the switch.
- */
- override fun newOutput(key: InterferenceKey?): SimResourceProvider {
- val forwarder = checkNotNull(_availableInputs.poll()) { "No capacity to serve request" }
- val output = Output(forwarder)
- _outputs += output
- return output
- }
-
- override fun removeOutput(output: SimResourceProvider) {
- if (!_outputs.remove(output)) {
- return
- }
-
- (output as Output).close()
- }
-
- /**
- * Add an input to the switch.
- */
- override fun addInput(input: SimResourceProvider) {
- if (input in inputs) {
- return
- }
-
- val forwarder = SimResourceForwarder()
-
- _inputs += input
- _availableInputs += forwarder
-
- input.startConsumer(object : SimResourceConsumer by forwarder {
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- if (event == SimResourceEvent.Exit) {
- // De-register the input after it has finished
- _inputs -= input
- }
-
- forwarder.onEvent(ctx, event)
- }
- })
- }
-
- override fun clear() {
- for (input in _inputs) {
- input.cancel()
- }
- _inputs.clear()
-
- // Outputs are implicitly cancelled by the inputs forwarders
- _outputs.clear()
- }
-
- /**
- * An output of the resource switch.
- */
- private inner class Output(private val forwarder: SimResourceForwarder) : SimResourceProvider by forwarder {
- /**
- * Close the output.
- */
- fun close() {
- // We explicitly do not close the forwarder here in order to re-use it across output resources.
- _outputs -= this
- _availableInputs += forwarder
- }
-
- override fun toString(): String = "SimResourceSwitchExclusive.Output"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
deleted file mode 100644
index 574fb443..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.simulator.resources.impl.SimResourceCountersImpl
-import org.opendc.simulator.resources.interference.InterferenceDomain
-import org.opendc.simulator.resources.interference.InterferenceKey
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
- * fair sharing.
- *
- * @param interpreter The interpreter for managing the resource contexts.
- * @param parent The parent resource system of the switch.
- * @param interferenceDomain The interference domain of the switch.
- */
-public class SimResourceSwitchMaxMin(
- private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
- private val interferenceDomain: InterferenceDomain? = null
-) : SimResourceSwitch {
- /**
- * The output resource providers to which resource consumers can be attached.
- */
- override val outputs: Set<SimResourceProvider>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
- private val _activeOutputs: MutableList<Output> = mutableListOf()
-
- /**
- * The input resources that will be switched between the output providers.
- */
- override val inputs: Set<SimResourceProvider>
- get() = _inputs
- private val _inputs = mutableSetOf<SimResourceProvider>()
- private val _activeInputs = mutableListOf<Input>()
-
- /**
- * The resource counters of this switch.
- */
- public override val counters: SimResourceCounters
- get() = _counters
- private val _counters = SimResourceCountersImpl()
-
- /**
- * The actual processing rate of the switch.
- */
- private var _rate = 0.0
-
- /**
- * The demanded processing rate of the outputs.
- */
- private var _demand = 0.0
-
- /**
- * The capacity of the switch.
- */
- private var _capacity = 0.0
-
- /**
- * Flag to indicate that the scheduler is active.
- */
- private var _schedulerActive = false
-
- /**
- * Add an output to the switch.
- */
- override fun newOutput(key: InterferenceKey?): SimResourceProvider {
- val provider = Output(_capacity, key)
- _outputs.add(provider)
- return provider
- }
-
- /**
- * Add the specified [input] to the switch.
- */
- override fun addInput(input: SimResourceProvider) {
- val consumer = Input(input)
- if (_inputs.add(input)) {
- _activeInputs.add(consumer)
- input.startConsumer(consumer)
- }
- }
-
- /**
- * Remove [output] from this switch.
- */
- override fun removeOutput(output: SimResourceProvider) {
- if (!_outputs.remove(output)) {
- return
- }
- // This cast should always succeed since only `Output` instances should be added to _outputs
- (output as Output).close()
- }
-
- override fun clear() {
- for (input in _activeInputs) {
- input.cancel()
- }
- _activeInputs.clear()
-
- for (output in _activeOutputs) {
- output.cancel()
- }
- _activeOutputs.clear()
- }
-
- /**
- * Run the scheduler of the switch.
- */
- private fun runScheduler(now: Long) {
- if (_schedulerActive) {
- return
- }
-
- _schedulerActive = true
- try {
- doSchedule(now)
- } finally {
- _schedulerActive = false
- }
- }
-
- /**
- * Schedule the outputs over the input.
- */
- private fun doSchedule(now: Long) {
- // If there is no work yet, mark the input as idle.
- if (_activeOutputs.isEmpty()) {
- return
- }
-
- val capacity = _capacity
- var availableCapacity = capacity
-
- // Pull in the work of the outputs
- val outputIterator = _activeOutputs.listIterator()
- for (output in outputIterator) {
- output.pull(now)
-
- // Remove outputs that have finished
- if (!output.isActive) {
- outputIterator.remove()
- }
- }
-
- var demand = 0.0
-
- // Sort in-place the outputs based on their requested usage.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- _activeOutputs.sort()
-
- // Divide the available input capacity fairly across the outputs using max-min fair sharing
- var remaining = _activeOutputs.size
- for (output in _activeOutputs) {
- val availableShare = availableCapacity / remaining--
- val grantedSpeed = min(output.allowedRate, availableShare)
-
- // Ignore idle computation
- if (grantedSpeed <= 0.0) {
- output.actualRate = 0.0
- continue
- }
-
- demand += output.limit
-
- output.actualRate = grantedSpeed
- availableCapacity -= grantedSpeed
- }
-
- val rate = capacity - availableCapacity
-
- _demand = demand
- _rate = rate
-
- // Sort all consumers by their capacity
- _activeInputs.sort()
-
- // Divide the requests over the available capacity of the input resources fairly
- for (input in _activeInputs) {
- val inputCapacity = input.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
-
- input.push(grantedSpeed)
- }
- }
-
- /**
- * Recompute the capacity of the switch.
- */
- private fun updateCapacity() {
- val newCapacity = _activeInputs.sumOf(Input::capacity)
-
- // No-op if the capacity is unchanged
- if (_capacity == newCapacity) {
- return
- }
-
- _capacity = newCapacity
-
- for (output in _outputs) {
- output.capacity = newCapacity
- }
- }
-
- /**
- * An internal [SimResourceProvider] implementation for switch outputs.
- */
- private inner class Output(capacity: Double, val key: InterferenceKey?) :
- SimAbstractResourceProvider(interpreter, capacity),
- SimResourceProviderLogic,
- Comparable<Output> {
- /**
- * The requested limit.
- */
- @JvmField var limit: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- @JvmField var actualRate: Double = 0.0
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- val allowedRate: Double
- get() = min(capacity, limit)
-
- /**
- * A flag to indicate that the output is closed.
- */
- private var _isClosed: Boolean = false
-
- /**
- * The timestamp at which we received the last command.
- */
- private var _lastPull: Long = Long.MIN_VALUE
-
- /**
- * Close the output.
- *
- * This method is invoked when the user removes an output from the switch.
- */
- fun close() {
- _isClosed = true
- cancel()
- }
-
- /* SimAbstractResourceProvider */
- override fun createLogic(): SimResourceProviderLogic = this
-
- override fun start(ctx: SimResourceControllableContext) {
- check(!_isClosed) { "Cannot re-use closed output" }
-
- _activeOutputs += this
- super.start(ctx)
- }
-
- /* SimResourceProviderLogic */
- override fun onConsume(
- ctx: SimResourceControllableContext,
- now: Long,
- delta: Long,
- limit: Double,
- duration: Long
- ) {
- doUpdateCounters(delta)
-
- actualRate = 0.0
- this.limit = limit
- _lastPull = now
-
- runScheduler(now)
- }
-
- override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- parent?.onConverge(now)
- }
-
- override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
- doUpdateCounters(delta)
-
- limit = 0.0
- actualRate = 0.0
- _lastPull = now
- }
-
- /* Comparable */
- override fun compareTo(other: Output): Int = allowedRate.compareTo(other.allowedRate)
-
- /**
- * Pull the next command if necessary.
- */
- fun pull(now: Long) {
- val ctx = ctx
- if (ctx != null && _lastPull < now) {
- ctx.flush()
- }
- }
-
- /**
- * Helper method to update the resource counters of the distributor.
- */
- private fun doUpdateCounters(delta: Long) {
- if (delta <= 0L) {
- return
- }
-
- // Compute the performance penalty due to resource interference
- val perfScore = if (interferenceDomain != null) {
- val load = _rate / capacity
- interferenceDomain.apply(key, load)
- } else {
- 1.0
- }
-
- val deltaS = delta / 1000.0
- val work = limit * deltaS
- val actualWork = actualRate * deltaS
- val remainingWork = work - actualWork
-
- updateCounters(work, actualWork, remainingWork)
-
- val distCounters = _counters
- distCounters.demand += work
- distCounters.actual += actualWork
- distCounters.overcommit += remainingWork
- distCounters.interference += actualWork * max(0.0, 1 - perfScore)
- }
- }
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private inner class Input(private val provider: SimResourceProvider) : SimResourceConsumer, Comparable<Input> {
- /**
- * The active [SimResourceContext] of this consumer.
- */
- private var _ctx: SimResourceContext? = null
-
- /**
- * The capacity of this input.
- */
- val capacity: Double
- get() = _ctx?.capacity ?: 0.0
-
- /**
- * Push the specified rate to the provider.
- */
- fun push(rate: Double) {
- _ctx?.push(rate)
- }
-
- /**
- * Cancel this input.
- */
- fun cancel() {
- provider.cancel()
- }
-
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- runScheduler(now)
- return Long.MAX_VALUE
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- assert(_ctx == null) { "Consumer running concurrently" }
- _ctx = ctx
- updateCapacity()
- }
- SimResourceEvent.Exit -> {
- _ctx = null
- updateCapacity()
- }
- SimResourceEvent.Capacity -> updateCapacity()
- else -> {}
- }
- }
-
- override fun compareTo(other: Input): Int = capacity.compareTo(other.capacity)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
deleted file mode 100644
index 1428ce42..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import io.mockk.*
-import kotlinx.coroutines.*
-import org.junit.jupiter.api.*
-import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.simulator.resources.impl.SimResourceContextImpl
-import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
-
-/**
- * A test suite for the [SimResourceContextImpl] class.
- */
-class SimResourceContextTest {
- @Test
- fun testFlushWithoutCommand() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(1.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(interpreter, consumer, logic)
-
- interpreter.scheduleSync(interpreter.clock.millis(), context)
- }
-
- @Test
- fun testIntermediateFlush() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = SimWorkConsumer(1.0, 1.0)
-
- val logic = spyk(object : SimResourceProviderLogic {})
- val context = SimResourceContextImpl(interpreter, consumer, logic)
- context.capacity = 1.0
-
- context.start()
- delay(1) // Delay 1 ms to prevent hitting the fast path
- interpreter.scheduleSync(interpreter.clock.millis(), context)
-
- verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) }
- }
-
- @Test
- fun testIntermediateFlushIdle() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = SimWorkConsumer(1.0, 1.0)
-
- val logic = spyk(object : SimResourceProviderLogic {})
- val context = SimResourceContextImpl(interpreter, consumer, logic)
- context.capacity = 1.0
-
- context.start()
- delay(500)
- context.invalidate()
- delay(500)
- context.invalidate()
-
- assertAll(
- { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } },
- { verify(exactly = 1) { logic.onFinish(any(), any(), any()) } }
- )
- }
-
- @Test
- fun testDoubleStart() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(0.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(interpreter, consumer, logic)
-
- context.start()
-
- assertThrows<IllegalStateException> {
- context.start()
- }
- }
-
- @Test
- fun testIdempotentCapacityChange() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = spyk(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(1.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- })
-
- val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(interpreter, consumer, logic)
- context.capacity = 4200.0
- context.start()
- context.capacity = 4200.0
-
- verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
- }
-
- @Test
- fun testFailureNoInfiniteLoop() = runBlockingSimulation {
- val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
-
- val consumer = spyk(object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- ctx.close()
- return Long.MAX_VALUE
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- if (event == SimResourceEvent.Exit) throw IllegalStateException("onEvent")
- }
-
- override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
- throw IllegalStateException("onFailure")
- }
- })
-
- val logic = object : SimResourceProviderLogic {}
-
- val context = SimResourceContextImpl(interpreter, consumer, logic)
-
- context.start()
-
- delay(1)
-
- verify(exactly = 1) { consumer.onFailure(any(), any()) }
- }
-}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 96b300d7..59308e11 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -123,7 +123,7 @@ class RunnerCli : CliktCommand(name = "runner") {
.default(60L * 3) // Experiment may run for a maximum of three minutes
/**
- * Run a single scenario.
+ * Converge a single scenario.
*/
private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> {
val id = scenario.id
@@ -158,7 +158,7 @@ class RunnerCli : CliktCommand(name = "runner") {
}
/**
- * Run a single repeat.
+ * Converge a single repeat.
*/
private suspend fun runRepeat(
scenario: Scenario,
@@ -199,7 +199,7 @@ class RunnerCli : CliktCommand(name = "runner") {
try {
// Instantiate the topology onto the simulator
simulator.apply(topology)
- // Run workload trace
+ // Converge workload trace
simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
simulator.close()
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 992b4991..04f54e58 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -42,7 +42,7 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.trace.Trace
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -70,7 +70,7 @@ internal class WorkflowServiceTest {
.setClock(clock.toOtelClock())
.build()
- val interpreter = SimResourceInterpreter(coroutineContext, clock)
+ val interpreter = FlowEngine(coroutineContext, clock)
val machineModel = createMachineModel()
val hvProvider = SimSpaceSharedHypervisorProvider()
val hosts = List(4) { id ->
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 587f1cb2..d9b3d940 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -38,7 +38,7 @@ include(":opendc-web:opendc-web-api")
include(":opendc-web:opendc-web-ui")
include(":opendc-web:opendc-web-runner")
include(":opendc-simulator:opendc-simulator-core")
-include(":opendc-simulator:opendc-simulator-resources")
+include(":opendc-simulator:opendc-simulator-flow")
include(":opendc-simulator:opendc-simulator-power")
include(":opendc-simulator:opendc-simulator-network")
include(":opendc-simulator:opendc-simulator-compute")