summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt31
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt61
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt105
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt39
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt12
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt246
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt36
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt42
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt33
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt50
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt64
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt17
34 files changed, 627 insertions, 441 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index f08a7e1e..89016c55 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -112,7 +112,7 @@ public class SimHost(
*/
public val hypervisor: SimHypervisor = hypervisor.create(
interpreter,
- object : SimHypervisor.Listener {
+ listener = object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
requestedWork: Long,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 893e8bcd..4b21b4f7 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -114,7 +114,7 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
{ assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207112418947, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
{ assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index 2df307d3..2b84a17c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -166,13 +166,23 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource
/**
* The actual resource supporting the processing unit.
*/
- private val source = switch.addOutput(model.frequency)
-
- override val speed: Double = 0.0 /* TODO Implement */
+ private val source = switch.newOutput()
override val state: SimResourceState
get() = source.state
+ override val capacity: Double
+ get() = source.capacity
+
+ override val speed: Double
+ get() = source.speed
+
+ override val demand: Double
+ get() = source.demand
+
+ override val counters: SimResourceCounters
+ get() = source.counters
+
override fun startConsumer(consumer: SimResourceConsumer) {
source.startConsumer(consumer)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index de2b3eef..d40cdac5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -37,8 +37,12 @@ import java.time.Clock
* Abstract implementation of the [SimMachine] interface.
*
* @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
*/
-public abstract class SimAbstractMachine(protected val interpreter: SimResourceInterpreter) : SimMachine, SimResourceSystem {
+public abstract class SimAbstractMachine(
+ protected val interpreter: SimResourceInterpreter,
+ final override val parent: SimResourceSystem?
+) : SimMachine, SimResourceSystem {
private val _usage = MutableStateFlow(0.0)
override val usage: StateFlow<Double>
get() = _usage
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index f5218ba9..082719e2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -28,25 +28,27 @@ import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.*
import org.opendc.simulator.resources.SimResourceInterpreter
-import kotlin.coroutines.*
/**
* A simulated bare-metal machine that is able to run a single workload.
*
* A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For
- * example. the class expects only a single concurrent call to [run].
+ * example. The class expects only a single concurrent call to [run].
*
- * @param context The [CoroutineContext] to run the simulated workload in.
- * @param clock The virtual clock to track the simulation time.
+ * @param interpreter The [SimResourceInterpreter] to drive the simulation.
* @param model The machine model to simulate.
+ * @param scalingGovernor The CPU frequency scaling governor to use.
+ * @param scalingDriver The CPU frequency scaling driver to use.
+ * @param parent The parent simulation system.
*/
@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
public class SimBareMetalMachine(
- platform: SimResourceInterpreter,
+ interpreter: SimResourceInterpreter,
override val model: SimMachineModel,
scalingGovernor: ScalingGovernor,
- scalingDriver: ScalingDriver
-) : SimAbstractMachine(platform) {
+ scalingDriver: ScalingDriver,
+ parent: SimResourceSystem? = null,
+) : SimAbstractMachine(interpreter, parent) {
override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
/**
@@ -61,8 +63,6 @@ public class SimBareMetalMachine(
scalingGovernor.createLogic(this.scalingDriver.createContext(cpu))
}
- override val parent: SimResourceSystem? = null
-
init {
scalingGovernors.forEach { it.onStart() }
}
@@ -89,11 +89,20 @@ public class SimBareMetalMachine(
*/
private val source = SimResourceSource(model.frequency, interpreter, this@SimBareMetalMachine)
+ override val state: SimResourceState
+ get() = source.state
+
+ override val capacity: Double
+ get() = source.capacity
+
override val speed: Double
get() = source.speed
- override val state: SimResourceState
- get() = source.state
+ override val demand: Double
+ get() = source.demand
+
+ override val counters: SimResourceCounters
+ get() = source.counters
override fun startConsumer(consumer: SimResourceConsumer) {
source.startConsumer(consumer)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index 33e7e637..3ceb3291 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -26,33 +26,62 @@ import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.simulator.resources.SimResourceSwitch
import org.opendc.simulator.resources.SimResourceSwitchMaxMin
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
* [SimBareMetalMachine] concurrently using weighted fair sharing.
*
+ * @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val interpreter: SimResourceInterpreter, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor(interpreter) {
+public class SimFairShareHypervisor(
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null,
+ private val listener: SimHypervisor.Listener? = null
+) : SimAbstractHypervisor(interpreter) {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
- return SimResourceSwitchMaxMin(
- interpreter, null,
- object : SimResourceSwitchMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
+ return SwitchSystem().switch
+ }
+
+ private inner class SwitchSystem : SimResourceSystem {
+ val switch = SimResourceSwitchMaxMin(interpreter, this)
+
+ override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent
+
+ private var lastCpuUsage = 0.0
+ private var lastCpuDemand = 0.0
+ private var lastDemand = 0.0
+ private var lastActual = 0.0
+ private var lastOvercommit = 0.0
+ private var lastReport = Long.MIN_VALUE
+
+ override fun onConverge(timestamp: Long) {
+ val listener = listener ?: return
+ val counters = switch.counters
+
+ if (timestamp > lastReport) {
+ listener.onSliceFinish(
+ this@SimFairShareHypervisor,
+ (counters.demand - lastDemand).toLong(),
+ (counters.actual - lastActual).toLong(),
+ (counters.overcommit - lastOvercommit).toLong(),
+ 0L,
+ lastCpuUsage,
+ lastCpuDemand
+ )
}
- )
+ lastReport = timestamp
+
+ lastCpuDemand = switch.inputs.sumOf { it.demand }
+ lastCpuUsage = switch.inputs.sumOf { it.speed }
+ lastDemand = counters.demand
+ lastActual = counters.actual
+ lastOvercommit = counters.overcommit
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 68858cc1..d3206196 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
@@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter
public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
- override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimFairShareHypervisor(interpreter, listener)
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
index d0753084..8e8c3698 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A service provider interface for constructing a [SimHypervisor].
@@ -39,5 +40,9 @@ public interface SimHypervisorProvider {
/**
* Create a [SimHypervisor] instance with the specified [listener].
*/
- public fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener? = null): SimHypervisor
+ public fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null,
+ listener: SimHypervisor.Listener? = null
+ ): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
index 13c7d9b2..136a543a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
@@ -33,9 +33,4 @@ public interface SimProcessingUnit : SimResourceProvider {
* The model representing the static properties of the processing unit.
*/
public val model: ProcessingUnit
-
- /**
- * The current speed of the processing unit.
- */
- public val speed: Double
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index c017c8ab..923b5bab 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
@@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimSpaceSharedHypervisor(interpreter)
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimSpaceSharedHypervisor(interpreter)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index c1b5089c..1709cc23 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -96,7 +96,7 @@ internal class SimHypervisorTest {
val platform = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(platform, listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, listener)
launch {
machine.run(hypervisor)
@@ -171,7 +171,7 @@ internal class SimHypervisorTest {
platform, model, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(platform, listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, listener)
launch {
machine.run(hypervisor)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index 9233c72d..b45b2a2f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -90,7 +90,7 @@ class SimResourceBenchmarks {
switch.addInput(SimResourceSource(3000.0, interpreter))
switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -105,7 +105,7 @@ class SimResourceBenchmarks {
repeat(3) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -120,7 +120,7 @@ class SimResourceBenchmarks {
switch.addInput(SimResourceSource(3000.0, interpreter))
switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -135,7 +135,7 @@ class SimResourceBenchmarks {
repeat(2) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index be04d399..5fe7d7bb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -42,7 +42,7 @@ public abstract class SimAbstractResourceAggregator(
/**
* This method is invoked when the resource consumer finishes processing.
*/
- protected abstract fun doFinish(cause: Throwable?)
+ protected abstract fun doFinish()
/**
* This method is invoked when an input context is started.
@@ -54,8 +54,9 @@ public abstract class SimAbstractResourceAggregator(
*/
protected abstract fun onInputFinished(input: Input)
+ /* SimResourceAggregator */
override fun addInput(input: SimResourceProvider) {
- check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+ check(state != SimResourceState.Stopped) { "Aggregator has been stopped" }
val consumer = Consumer()
_inputs.add(input)
@@ -63,44 +64,77 @@ public abstract class SimAbstractResourceAggregator(
input.startConsumer(consumer)
}
- override fun close() {
- output.close()
- }
-
- override val output: SimResourceProvider
- get() = _output
- private val _output = SimResourceForwarder()
-
override val inputs: Set<SimResourceProvider>
get() = _inputs
private val _inputs = mutableSetOf<SimResourceProvider>()
private val _inputConsumers = mutableListOf<Consumer>()
- protected val outputContext: SimResourceContext
- get() = context
- private val context = interpreter.newContext(
- _output,
- object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
- doIdle(deadline)
- return Long.MAX_VALUE
- }
+ /* SimResourceProvider */
+ override val state: SimResourceState
+ get() = _output.state
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
- doConsume(work, limit, deadline)
- return Long.MAX_VALUE
- }
+ override val capacity: Double
+ get() = _output.capacity
- override fun onFinish(ctx: SimResourceControllableContext) {
- doFinish(null)
- }
+ override val speed: Double
+ get() = _output.speed
+
+ override val demand: Double
+ get() = _output.demand
+
+ override val counters: SimResourceCounters
+ get() = _output.counters
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ _output.startConsumer(consumer)
+ }
+
+ override fun cancel() {
+ _output.cancel()
+ }
+
+ override fun interrupt() {
+ _output.interrupt()
+ }
+
+ override fun close() {
+ _output.close()
+ }
+
+ private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) {
+ override fun createLogic(): SimResourceProviderLogic {
+ return object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
+ doIdle(deadline)
+ return Long.MAX_VALUE
+ }
+
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ doConsume(work, limit, deadline)
+ return Long.MAX_VALUE
+ }
+
+ override fun onFinish(ctx: SimResourceControllableContext) {
+ doFinish()
+ }
+
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
- override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- return _inputConsumers.sumOf { it.remainingWork }
+ override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ return _inputConsumers.sumOf { it.remainingWork }
+ }
}
- },
- parent
- )
+ }
+
+ /**
+ * Flush the progress of the output if possible.
+ */
+ fun flush() {
+ ctx?.flush()
+ }
+ }
/**
* An input for the resource aggregator.
@@ -141,7 +175,7 @@ public abstract class SimAbstractResourceAggregator(
private fun updateCapacity() {
// Adjust capacity of output resource
- context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
+ _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
}
/* Input */
@@ -158,7 +192,7 @@ public abstract class SimAbstractResourceAggregator(
this.command = null
next
} else {
- context.flush()
+ _output.flush()
next = command
this.command = null
@@ -172,11 +206,6 @@ public abstract class SimAbstractResourceAggregator(
_ctx = ctx
updateCapacity()
- // Make sure we initialize the output if we have not done so yet
- if (context.state == SimResourceState.Pending) {
- context.start()
- }
-
onInputStarted(this)
}
SimResourceEvent.Capacity -> updateCapacity()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index 519c2615..de26f99e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -22,27 +22,49 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
/**
* Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations.
*/
public abstract class SimAbstractResourceProvider(
private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
+ private val parent: SimResourceSystem?,
initialCapacity: Double
) : SimResourceProvider {
/**
* The capacity of the resource.
*/
- public open var capacity: Double = initialCapacity
- protected set(value) {
+ public override var capacity: Double = initialCapacity
+ set(value) {
field = value
ctx?.capacity = value
}
/**
+ * The current processing speed of the resource.
+ */
+ public override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
+ /**
* The [SimResourceControllableContext] that is currently running.
*/
protected var ctx: SimResourceControllableContext? = null
+ private set
/**
* The state of the resource provider.
@@ -62,6 +84,17 @@ public abstract class SimAbstractResourceProvider(
ctx.start()
}
+ /**
+ * Update the counters of the resource provider.
+ */
+ protected fun updateCounters(ctx: SimResourceContext, work: Double) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += work
+ counters.actual += work - remainingWork
+ counters.overcommit += remainingWork
+ }
+
final override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
val ctx = interpreter.newContext(consumer, createLogic(), parent)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
index 5c0346cd..00972f43 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -25,12 +25,7 @@ package org.opendc.simulator.resources
/**
* A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
*/
-public interface SimResourceAggregator : AutoCloseable {
- /**
- * The output resource provider to which resource consumers can be attached.
- */
- public val output: SimResourceProvider
-
+public interface SimResourceAggregator : SimResourceProvider {
/**
* The input resources that will be switched between the output providers.
*/
@@ -40,9 +35,4 @@ public interface SimResourceAggregator : AutoCloseable {
* Add the specified [input] to the aggregator.
*/
public fun addInput(input: SimResourceProvider)
-
- /**
- * End the lifecycle of the aggregator.
- */
- public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index bdab6def..c39c1aca 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -38,7 +38,7 @@ public class SimResourceAggregatorMaxMin(
// Divide the requests over the available capacity of the input resources fairly
for (input in consumers) {
val inputCapacity = input.ctx.capacity
- val fraction = inputCapacity / outputContext.capacity
+ val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
val grantedWork = fraction * work
@@ -56,7 +56,7 @@ public class SimResourceAggregatorMaxMin(
}
}
- override fun doFinish(cause: Throwable?) {
+ override fun doFinish() {
val iterator = consumers.iterator()
for (input in iterator) {
iterator.remove()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index 7c76c634..0d9a6106 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -45,6 +45,11 @@ public interface SimResourceContext {
public val speed: Double
/**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
* The amount of work still remaining at this instant.
*/
public val remainingWork: Double
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
new file mode 100644
index 00000000..725aa5bc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * An interface that tracks cumulative counts of the work performed by a resource.
+ */
+public interface SimResourceCounters {
+ /**
+ * The amount of work that resource consumers wanted the resource to perform.
+ */
+ public val demand: Double
+
+ /**
+ * The amount of work performed by the resource.
+ */
+ public val actual: Double
+
+ /**
+ * The amount of work that could not be completed due to overcommitted resources.
+ */
+ public val overcommit: Double
+
+ /**
+ * Reset the resource counters.
+ */
+ public fun reset()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
index 8dd1bd2b..e0333ff9 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -25,19 +25,14 @@ package org.opendc.simulator.resources
/**
* A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
*/
-public interface SimResourceDistributor : AutoCloseable {
+public interface SimResourceDistributor : SimResourceConsumer {
/**
* The output resource providers to which resource consumers can be attached.
*/
public val outputs: Set<SimResourceProvider>
/**
- * The input resource that will be distributed over the consumers.
+ * Create a new output for the distributor.
*/
- public val input: SimResourceProvider
-
- /**
- * Create a new output for the distributor with the specified [capacity].
- */
- public fun addOutput(capacity: Double): SimResourceProvider
+ public fun newOutput(): SimResourceProvider
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index e99f5eff..be9e89fb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -29,24 +29,22 @@ import kotlin.math.min
* A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
*/
public class SimResourceDistributorMaxMin(
- override val input: SimResourceProvider,
private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
- private val listener: Listener? = null
+ private val parent: SimResourceSystem? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
get() = _outputs
private val _outputs = mutableSetOf<Output>()
/**
- * The active outputs.
+ * The resource context of the consumer.
*/
- private val activeOutputs: MutableList<Output> = mutableListOf()
+ private var ctx: SimResourceContext? = null
/**
- * The total speed requested by the output resources.
+ * The active outputs.
*/
- private var totalRequestedSpeed = 0.0
+ private val activeOutputs: MutableList<Output> = mutableListOf()
/**
* The total amount of work requested by the output resources.
@@ -58,145 +56,83 @@ public class SimResourceDistributorMaxMin(
*/
private var totalAllocatedSpeed = 0.0
- /**
- * The total allocated work requested for the output resources.
- */
- private var totalAllocatedWork = 0.0
-
- /**
- * The amount of work that could not be performed due to over-committing resources.
- */
- private var totalOvercommittedWork = 0.0
-
- /**
- * The amount of work that was lost due to interference.
- */
- private var totalInterferedWork = 0.0
-
- /**
- * The timestamp of the last report.
- */
- private var lastReport: Long = Long.MIN_VALUE
-
- /**
- * A flag to indicate that the switch is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private val consumer = object : SimResourceConsumer {
- /**
- * The resource context of the consumer.
- */
- private lateinit var ctx: SimResourceContext
-
- val remainingWork: Double
- get() = ctx.remainingWork
-
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return doNext(ctx.capacity)
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- }
- SimResourceEvent.Exit -> {
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
-
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
-
- output.close()
- }
- }
- else -> {}
- }
- }
+ /* SimResourceDistributor */
+ override fun newOutput(): SimResourceProvider {
+ val provider = Output(ctx?.capacity ?: 0.0)
+ _outputs.add(provider)
+ return provider
}
- /**
- * The total amount of remaining work.
- */
- private val totalRemainingWork: Double
- get() = consumer.remainingWork
-
- init {
- input.startConsumer(consumer)
+ /* SimResourceConsumer */
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
}
- override fun addOutput(capacity: Double): SimResourceProvider {
- check(!isClosed) { "Distributor has been closed" }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ updateCapacity(ctx)
+ }
+ SimResourceEvent.Exit -> {
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
- val provider = Output(capacity)
- _outputs.add(provider)
- return provider
- }
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
- override fun close() {
- if (!isClosed) {
- isClosed = true
- input.cancel()
+ output.close()
+ }
+ }
+ SimResourceEvent.Capacity -> updateCapacity(ctx)
+ else -> {}
}
}
/**
- * Schedule the work over the physical CPUs.
+ * Schedule the work of the outputs.
*/
- private fun doSchedule(capacity: Double): SimResourceCommand {
- // If there is no work yet, mark all inputs as idle.
+ private fun doNext(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark the input as idle.
if (activeOutputs.isEmpty()) {
return SimResourceCommand.Idle()
}
- val maxUsage = capacity
var duration: Double = Double.MAX_VALUE
var deadline: Long = Long.MAX_VALUE
- var availableSpeed = maxUsage
+ var availableSpeed = capacity
var totalRequestedSpeed = 0.0
var totalRequestedWork = 0.0
- // Flush the work of the outputs
- var outputIterator = activeOutputs.listIterator()
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
-
+ // Pull in the work of the outputs
+ val outputIterator = activeOutputs.listIterator()
+ for (output in outputIterator) {
output.pull()
+ // Remove outputs that have finished
if (output.isFinished) {
- // The output consumer has exited, so remove it from the scheduling queue.
outputIterator.remove()
}
}
- // Sort the outputs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ // Sort in-place the outputs based on their requested usage.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
activeOutputs.sort()
// Divide the available input capacity fairly across the outputs using max-min fair sharing
- outputIterator = activeOutputs.listIterator()
var remaining = activeOutputs.size
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
+ for (output in activeOutputs) {
val availableShare = availableSpeed / remaining--
when (val command = output.activeCommand) {
is SimResourceCommand.Idle -> {
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
-
output.actualSpeed = 0.0
}
is SimResourceCommand.Consume -> {
val grantedSpeed = min(output.allowedSpeed, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
// Ignore idle computation
@@ -211,7 +147,7 @@ public class SimResourceDistributorMaxMin(
output.actualSpeed = grantedSpeed
availableSpeed -= grantedSpeed
- // The duration that we want to run is that of the shortest request from an output
+ // The duration that we want to run is that of the shortest request of an output
duration = min(duration, command.work / grantedSpeed)
}
SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
@@ -220,67 +156,23 @@ public class SimResourceDistributorMaxMin(
assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
- this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
- this.totalAllocatedSpeed = maxUsage - availableSpeed
- this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration))
+ this.totalAllocatedSpeed = capacity - availableSpeed
+ val totalAllocatedWork = min(
+ totalRequestedWork,
+ totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)
+ )
return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline)
else
SimResourceCommand.Idle(deadline)
}
- /**
- * Obtain the next command to perform.
- */
- private fun doNext(capacity: Double): SimResourceCommand {
- val totalRequestedWork = totalRequestedWork.toLong()
- val totalAllocatedWork = totalAllocatedWork.toLong()
- val totalRemainingWork = totalRemainingWork.toLong()
- val totalRequestedSpeed = totalRequestedSpeed
- val totalAllocatedSpeed = totalAllocatedSpeed
-
- // Force all inputs to re-schedule their work.
- val command = doSchedule(capacity)
-
- val now = interpreter.clock.millis()
- if (lastReport < now) {
- // Report metrics
- listener?.onSliceFinish(
- this,
- totalRequestedWork,
- totalAllocatedWork - totalRemainingWork,
- totalOvercommittedWork.toLong(),
- totalInterferedWork.toLong(),
- totalAllocatedSpeed,
- totalRequestedSpeed
- )
- lastReport = now
+ private fun updateCapacity(ctx: SimResourceContext) {
+ for (output in _outputs) {
+ output.capacity = ctx.capacity
}
-
- totalInterferedWork = 0.0
- totalOvercommittedWork = 0.0
-
- return command
- }
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
}
/**
@@ -322,7 +214,7 @@ public class SimResourceDistributorMaxMin(
interpreter.batch {
ctx.start()
// Interrupt the input to re-schedule the resources
- input.interrupt()
+ this@SimResourceDistributorMaxMin.ctx?.interrupt()
}
}
@@ -338,8 +230,6 @@ public class SimResourceDistributorMaxMin(
/* SimResourceProviderLogic */
override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
- reportOvercommit(ctx.remainingWork)
-
allowedSpeed = 0.0
activeCommand = SimResourceCommand.Idle(deadline)
lastCommandTimestamp = ctx.clock.millis()
@@ -348,8 +238,6 @@ public class SimResourceDistributorMaxMin(
}
override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
- reportOvercommit(ctx.remainingWork)
-
allowedSpeed = ctx.speed
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
lastCommandTimestamp = ctx.clock.millis()
@@ -357,31 +245,25 @@ public class SimResourceDistributorMaxMin(
return Long.MAX_VALUE
}
- override fun onFinish(ctx: SimResourceControllableContext) {
- reportOvercommit(ctx.remainingWork)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+ override fun onFinish(ctx: SimResourceControllableContext) {
activeCommand = SimResourceCommand.Exit
lastCommandTimestamp = ctx.clock.millis()
}
override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- // Apply performance interference model
- val performanceScore = 1.0
+ val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0
- // Compute the remaining amount of work
return if (work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
+ // Compute the fraction of compute time allocated to the output
val fraction = actualSpeed / totalAllocatedSpeed
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
-
- totalInterferedWork += interferedWork
-
- max(0.0, work - processed)
+ // Compute the work that was actually granted to the output.
+ val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction
+ max(0.0, work - processingAvailable)
} else {
0.0
}
@@ -399,9 +281,5 @@ public class SimResourceDistributorMaxMin(
ctx.flush()
}
}
-
- private fun reportOvercommit(remainingWork: Double) {
- totalOvercommittedWork += remainingWork
- }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 2f567a5e..f709ca17 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -27,7 +27,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
- * A [SimResourceProvider] provides some resource of type [R].
+ * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer].
*/
public interface SimResourceProvider : AutoCloseable {
/**
@@ -36,6 +36,26 @@ public interface SimResourceProvider : AutoCloseable {
public val state: SimResourceState
/**
+ * The resource capacity available at this instant.
+ */
+ public val capacity: Double
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public val speed: Double
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public val counters: SimResourceCounters
+
+ /**
* Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
*
* @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
index 22676984..5231ecf5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -29,7 +29,7 @@ import kotlin.math.max
*/
public interface SimResourceProviderLogic {
/**
- * This method is invoked when the resource will idle until the specified [deadline].
+ * This method is invoked when the resource is reported to idle until the specified [deadline].
*
* @param ctx The context in which the provider runs.
* @param deadline The deadline that was requested by the resource consumer.
@@ -38,8 +38,8 @@ public interface SimResourceProviderLogic {
public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long
/**
- * This method is invoked when the resource will be consumed until the specified [work] was processed or the
- * [deadline] was reached.
+ * This method is invoked when the resource will be consumed until the specified amount of [work] was processed
+ * or [deadline] is reached.
*
* @param ctx The context in which the provider runs.
* @param work The amount of work that was requested by the resource consumer.
@@ -50,6 +50,14 @@ public interface SimResourceProviderLogic {
public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long
/**
+ * This method is invoked when the progress of the resource consumer is materialized.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param work The amount of work that was requested by the resource consumer.
+ */
+ public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {}
+
+ /**
* This method is invoked when the resource consumer has finished.
*/
public fun onFinish(ctx: SimResourceControllableContext)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index d984d2a5..9f062cc3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -37,21 +37,6 @@ public class SimResourceSource(
private val interpreter: SimResourceInterpreter,
private val parent: SimResourceSystem? = null
) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) {
- /**
- * The current processing speed of the resource.
- */
- public val speed: Double
- get() = ctx?.speed ?: 0.0
-
- /**
- * The capacity of the resource.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- ctx?.capacity = value
- }
-
override fun createLogic(): SimResourceProviderLogic {
return object : SimResourceProviderLogic {
override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
@@ -62,6 +47,10 @@ public class SimResourceSource(
return min(deadline, ctx.clock.millis() + getDuration(work, speed))
}
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+
override fun onFinish(ctx: SimResourceControllableContext) {
cancel()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index 53fec16a..e224285e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable {
public val inputs: Set<SimResourceProvider>
/**
- * Add an output to the switch with the specified [capacity].
+ * The resource counters to track the execution metrics of all switch resources.
*/
- public fun addOutput(capacity: Double): SimResourceProvider
+ public val counters: SimResourceCounters
+
+ /**
+ * Create a new output on the switch.
+ */
+ public fun newOutput(): SimResourceProvider
/**
* Add the specified [input] to the switch.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 7f1bb2b7..2950af80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
override val inputs: Set<SimResourceProvider>
get() = _inputs
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override val counters: SimResourceCounters = object : SimResourceCounters {
+ override val demand: Double
+ get() = _inputs.sumOf { it.counters.demand }
+ override val actual: Double
+ get() = _inputs.sumOf { it.counters.actual }
+ override val overcommit: Double
+ get() = _inputs.sumOf { it.counters.overcommit }
+
+ override fun reset() {
+ for (input in _inputs) {
+ input.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
- val output = Provider(capacity, forwarder)
+ val output = Provider(forwarder)
_outputs += output
return output
}
@@ -84,10 +101,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
_inputs.forEach(SimResourceProvider::cancel)
}
- private inner class Provider(
- private val capacity: Double,
- private val forwarder: SimResourceTransformer
- ) : SimResourceProvider by forwarder {
+ private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder {
override fun close() {
// We explicitly do not close the forwarder here in order to re-use it across output resources.
_outputs -= this
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index 61887e34..684a1b52 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -28,16 +28,25 @@ package org.opendc.simulator.resources
*/
public class SimResourceSwitchMaxMin(
interpreter: SimResourceInterpreter,
- parent: SimResourceSystem? = null,
- private val listener: Listener? = null
+ parent: SimResourceSystem? = null
) : SimResourceSwitch {
- private val _outputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
override val outputs: Set<SimResourceProvider>
- get() = _outputs
+ get() = distributor.outputs
- private val _inputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The input resources that will be switched between the output providers.
+ */
override val inputs: Set<SimResourceProvider>
- get() = _inputs
+ get() = aggregator.inputs
+
+ /**
+ * The resource counters to track the execution metrics of all switch resources.
+ */
+ override val counters: SimResourceCounters
+ get() = aggregator.counters
/**
* A flag to indicate that the switch was closed.
@@ -52,32 +61,19 @@ public class SimResourceSwitchMaxMin(
/**
* The distributor to distribute the aggregated resources.
*/
- private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, interpreter, parent,
- object : SimResourceDistributorMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
- }
- )
+ private val distributor = SimResourceDistributorMaxMin(interpreter, parent)
+
+ init {
+ aggregator.startConsumer(distributor)
+ }
/**
- * Add an output to the switch with the specified [capacity].
+ * Add an output to the switch.
*/
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
- val provider = distributor.addOutput(capacity)
- _outputs.add(provider)
- return provider
+ return distributor.newOutput()
}
/**
@@ -92,26 +88,7 @@ public class SimResourceSwitchMaxMin(
override fun close() {
if (!isClosed) {
isClosed = true
- distributor.close()
aggregator.close()
}
}
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 32f3f573..fd3d1230 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
/**
* A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
*
@@ -53,6 +55,19 @@ public class SimResourceTransformer(
override var state: SimResourceState = SimResourceState.Pending
private set
+ override val capacity: Double
+ get() = ctx?.capacity ?: 0.0
+
+ override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
@@ -97,10 +112,15 @@ public class SimResourceTransformer(
start()
}
+ updateCounters(ctx)
+
return if (state == SimResourceState.Stopped) {
SimResourceCommand.Exit
} else if (delegate != null) {
val command = transform(ctx, delegate.onNext(ctx))
+
+ _work = if (command is SimResourceCommand.Consume) command.work else 0.0
+
if (command == SimResourceCommand.Exit) {
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
// reset beforehand the existing state and check whether it has been updated afterwards
@@ -169,6 +189,22 @@ public class SimResourceTransformer(
state = SimResourceState.Pending
}
}
+
+ /**
+ * Counter to track the current submitted work.
+ */
+ private var _work = 0.0
+
+ /**
+ * Update the resource counters for the transformer.
+ */
+ private fun updateCounters(ctx: SimResourceContext) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += _work
+ counters.actual += _work - remainingWork
+ counters.overcommit += remainingWork
+ }
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 0b3f5de1..46c5c63f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -87,6 +87,26 @@ internal class SimResourceContextImpl(
private var _speed = 0.0
/**
+ * The current resource processing demand.
+ */
+ override val demand: Double
+ get() = _limit
+
+ private val counters = object : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
+
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ /**
* The current state of the resource context.
*/
private var _timestamp: Long = Long.MIN_VALUE
@@ -145,7 +165,7 @@ internal class SimResourceContextImpl(
return
}
- doUpdate(clock.millis())
+ interpreter.scheduleSync(this)
}
/**
@@ -206,6 +226,11 @@ internal class SimResourceContextImpl(
val remainingWork = remainingWork
val isConsume = _limit > 0.0
+ // Update the resource counters only if there is some progress
+ if (timestamp > _timestamp) {
+ logic.onUpdate(this, _work)
+ }
+
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
// 2. The resource capacity cannot satisfy the demand.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
new file mode 100644
index 00000000..827019c5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.impl
+
+import org.opendc.simulator.resources.SimResourceCounters
+
+/**
+ * Mutable implementation of the [SimResourceCounters] interface.
+ */
+internal class SimResourceCountersImpl : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
+
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
index d09e1b45..cb0d6160 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
@@ -61,6 +61,11 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
private val futureInvocations = ArrayDeque<Invocation>()
/**
+ * The systems that have been visited during the interpreter cycle.
+ */
+ private val visited = linkedSetOf<SimResourceSystem>()
+
+ /**
* The index in the batch stack.
*/
private var batchIndex = 0
@@ -95,6 +100,30 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
+ * Update the specified [ctx] synchronously.
+ */
+ fun scheduleSync(ctx: SimResourceContextImpl) {
+ ctx.doUpdate(clock.millis())
+
+ if (visited.add(ctx)) {
+ collectAncestors(ctx, visited)
+ }
+
+ // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active interpreter.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
* Schedule the interpreter to run at [timestamp] to update the resource contexts.
*
* This method will override earlier calls to this method for the same [ctx].
@@ -148,7 +177,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
val queue = queue
val futureQueue = futureQueue
val futureInvocations = futureInvocations
- val visited = linkedSetOf<SimResourceSystem>()
+ val visited = visited
// Execute all scheduled updates at current timestamp
while (true) {
@@ -183,6 +212,8 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
for (system in visited) {
system.onConverge(now)
}
+
+ visited.clear()
} while (queue.isNotEmpty())
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index 994ae888..51024e80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -59,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest {
source.startConsumer(adapter)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertAll(
@@ -67,7 +67,7 @@ internal class SimResourceAggregatorMaxMinTest {
{ assertEquals(listOf(0.0, 0.5, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -87,14 +87,14 @@ internal class SimResourceAggregatorMaxMinTest {
val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
try {
- aggregator.output.consume(adapter)
+ aggregator.consume(adapter)
yield()
assertAll(
{ assertEquals(1000, clock.millis()) },
{ assertEquals(listOf(0.0, 2.0, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -115,13 +115,13 @@ internal class SimResourceAggregatorMaxMinTest {
.andThen(SimResourceCommand.Exit)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertEquals(1000, clock.millis())
verify(exactly = 2) { consumer.onNext(any()) }
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -142,11 +142,11 @@ internal class SimResourceAggregatorMaxMinTest {
.andThenThrows(IllegalStateException("Test Exception"))
try {
- assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ assertThrows<IllegalStateException> { aggregator.consume(consumer) }
yield()
assertEquals(SimResourceState.Pending, sources[0].state)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -164,14 +164,14 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(4.0, 1.0)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(1000)
sources[0].capacity = 0.5
}
yield()
assertEquals(2334, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -189,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(1.0, 0.5)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(500)
sources[0].capacity = 0.5
}
yield()
assertEquals(1000, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
+ }
+ }
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
+ val sources = listOf(
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+ assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" }
+ } finally {
+ aggregator.close()
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index 517dcb36..ad8d82e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -67,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest {
source.startConsumer(adapter)
switch.addInput(forwarder)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -98,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -142,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -170,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- switch.addOutput(3200.0)
- assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
+ switch.newOutput()
+ assertThrows<IllegalStateException> { switch.newOutput() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 0b023878..e4292ec0 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -47,7 +47,7 @@ internal class SimResourceSwitchMaxMinTest {
val sources = List(2) { SimResourceSource(2000.0, scheduler) }
sources.forEach { switch.addInput(it) }
- val provider = switch.addOutput(1000.0)
+ val provider = switch.newOutput()
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
@@ -67,26 +67,6 @@ internal class SimResourceSwitchMaxMinTest {
fun testOvercommittedSingle() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workload =
SimTraceConsumer(
@@ -98,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(scheduler, null, listener)
- val provider = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val provider = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -110,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest {
}
assertAll(
- { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
+ { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -124,26 +104,6 @@ internal class SimResourceSwitchMaxMinTest {
fun testOvercommittedDual() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workloadA =
SimTraceConsumer(
@@ -164,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(scheduler, null, listener)
- val providerA = switch.addOutput(3200.0)
- val providerB = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val providerA = switch.newOutput()
+ val providerB = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -181,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest {
switch.close()
}
assertAll(
- { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
+ { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index 04886399..810052b8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -206,4 +206,21 @@ internal class SimResourceTransformerTest {
assertEquals(0, clock.millis())
verify(exactly = 1) { consumer.onNext(any()) }
}
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ source.startConsumer(forwarder)
+
+ forwarder.consume(consumer)
+
+ assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
+ assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
+ assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(2000, clock.millis())
+ }
}