summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-02 13:03:10 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-02 15:49:07 +0200
commitcc87c9ad0b8e4ed3fa4fbad4ab94c5e53948ef3c (patch)
tree297cfbd39a197ce27016eec869cde1f374453f32 /opendc-simulator
parent9e5e830e15b74f040708e98c09ea41cd96d13871 (diff)
simulator: Add uniform interface for resource metrics
This change adds a new interface to the resources library for accessing metrics of resources such as work, demand and overcommitted work. With this change, we do not need an implementation specific listener interface in SimResourceSwitchMaxMin anymore. Another benefit of this approach is that updates will be scheduled more efficiently and progress will only be reported once the system has reached a steady-state for that timestamp.
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt16
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt31
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt61
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt105
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt39
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt12
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt246
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt36
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt42
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt33
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt50
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt64
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt17
32 files changed, 625 insertions, 439 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index 2df307d3..2b84a17c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -166,13 +166,23 @@ public abstract class SimAbstractHypervisor(private val interpreter: SimResource
/**
* The actual resource supporting the processing unit.
*/
- private val source = switch.addOutput(model.frequency)
-
- override val speed: Double = 0.0 /* TODO Implement */
+ private val source = switch.newOutput()
override val state: SimResourceState
get() = source.state
+ override val capacity: Double
+ get() = source.capacity
+
+ override val speed: Double
+ get() = source.speed
+
+ override val demand: Double
+ get() = source.demand
+
+ override val counters: SimResourceCounters
+ get() = source.counters
+
override fun startConsumer(consumer: SimResourceConsumer) {
source.startConsumer(consumer)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index de2b3eef..d40cdac5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -37,8 +37,12 @@ import java.time.Clock
* Abstract implementation of the [SimMachine] interface.
*
* @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
*/
-public abstract class SimAbstractMachine(protected val interpreter: SimResourceInterpreter) : SimMachine, SimResourceSystem {
+public abstract class SimAbstractMachine(
+ protected val interpreter: SimResourceInterpreter,
+ final override val parent: SimResourceSystem?
+) : SimMachine, SimResourceSystem {
private val _usage = MutableStateFlow(0.0)
override val usage: StateFlow<Double>
get() = _usage
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index f5218ba9..082719e2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -28,25 +28,27 @@ import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.resources.*
import org.opendc.simulator.resources.SimResourceInterpreter
-import kotlin.coroutines.*
/**
* A simulated bare-metal machine that is able to run a single workload.
*
* A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For
- * example. the class expects only a single concurrent call to [run].
+ * example. The class expects only a single concurrent call to [run].
*
- * @param context The [CoroutineContext] to run the simulated workload in.
- * @param clock The virtual clock to track the simulation time.
+ * @param interpreter The [SimResourceInterpreter] to drive the simulation.
* @param model The machine model to simulate.
+ * @param scalingGovernor The CPU frequency scaling governor to use.
+ * @param scalingDriver The CPU frequency scaling driver to use.
+ * @param parent The parent simulation system.
*/
@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
public class SimBareMetalMachine(
- platform: SimResourceInterpreter,
+ interpreter: SimResourceInterpreter,
override val model: SimMachineModel,
scalingGovernor: ScalingGovernor,
- scalingDriver: ScalingDriver
-) : SimAbstractMachine(platform) {
+ scalingDriver: ScalingDriver,
+ parent: SimResourceSystem? = null,
+) : SimAbstractMachine(interpreter, parent) {
override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
/**
@@ -61,8 +63,6 @@ public class SimBareMetalMachine(
scalingGovernor.createLogic(this.scalingDriver.createContext(cpu))
}
- override val parent: SimResourceSystem? = null
-
init {
scalingGovernors.forEach { it.onStart() }
}
@@ -89,11 +89,20 @@ public class SimBareMetalMachine(
*/
private val source = SimResourceSource(model.frequency, interpreter, this@SimBareMetalMachine)
+ override val state: SimResourceState
+ get() = source.state
+
+ override val capacity: Double
+ get() = source.capacity
+
override val speed: Double
get() = source.speed
- override val state: SimResourceState
- get() = source.state
+ override val demand: Double
+ get() = source.demand
+
+ override val counters: SimResourceCounters
+ get() = source.counters
override fun startConsumer(consumer: SimResourceConsumer) {
source.startConsumer(consumer)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index 33e7e637..3ceb3291 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -26,33 +26,62 @@ import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.simulator.resources.SimResourceSwitch
import org.opendc.simulator.resources.SimResourceSwitchMaxMin
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
* [SimBareMetalMachine] concurrently using weighted fair sharing.
*
+ * @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val interpreter: SimResourceInterpreter, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor(interpreter) {
+public class SimFairShareHypervisor(
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null,
+ private val listener: SimHypervisor.Listener? = null
+) : SimAbstractHypervisor(interpreter) {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
- return SimResourceSwitchMaxMin(
- interpreter, null,
- object : SimResourceSwitchMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
+ return SwitchSystem().switch
+ }
+
+ private inner class SwitchSystem : SimResourceSystem {
+ val switch = SimResourceSwitchMaxMin(interpreter, this)
+
+ override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent
+
+ private var lastCpuUsage = 0.0
+ private var lastCpuDemand = 0.0
+ private var lastDemand = 0.0
+ private var lastActual = 0.0
+ private var lastOvercommit = 0.0
+ private var lastReport = Long.MIN_VALUE
+
+ override fun onConverge(timestamp: Long) {
+ val listener = listener ?: return
+ val counters = switch.counters
+
+ if (timestamp > lastReport) {
+ listener.onSliceFinish(
+ this@SimFairShareHypervisor,
+ (counters.demand - lastDemand).toLong(),
+ (counters.actual - lastActual).toLong(),
+ (counters.overcommit - lastOvercommit).toLong(),
+ 0L,
+ lastCpuUsage,
+ lastCpuDemand
+ )
}
- )
+ lastReport = timestamp
+
+ lastCpuDemand = switch.inputs.sumOf { it.demand }
+ lastCpuUsage = switch.inputs.sumOf { it.speed }
+ lastDemand = counters.demand
+ lastActual = counters.actual
+ lastOvercommit = counters.overcommit
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 68858cc1..d3206196 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
@@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter
public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
- override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimFairShareHypervisor(interpreter, listener)
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
index d0753084..8e8c3698 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A service provider interface for constructing a [SimHypervisor].
@@ -39,5 +40,9 @@ public interface SimHypervisorProvider {
/**
* Create a [SimHypervisor] instance with the specified [listener].
*/
- public fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener? = null): SimHypervisor
+ public fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null,
+ listener: SimHypervisor.Listener? = null
+ ): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
index 13c7d9b2..136a543a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
@@ -33,9 +33,4 @@ public interface SimProcessingUnit : SimResourceProvider {
* The model representing the static properties of the processing unit.
*/
public val model: ProcessingUnit
-
- /**
- * The current speed of the processing unit.
- */
- public val speed: Double
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index c017c8ab..923b5bab 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.compute
import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
@@ -30,7 +31,9 @@ import org.opendc.simulator.resources.SimResourceInterpreter
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(interpreter: SimResourceInterpreter, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimSpaceSharedHypervisor(interpreter)
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimSpaceSharedHypervisor(interpreter)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index c1b5089c..1709cc23 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -96,7 +96,7 @@ internal class SimHypervisorTest {
val platform = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(platform, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(platform, listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, listener)
launch {
machine.run(hypervisor)
@@ -171,7 +171,7 @@ internal class SimHypervisorTest {
platform, model, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(platform, listener)
+ val hypervisor = SimFairShareHypervisor(platform, null, listener)
launch {
machine.run(hypervisor)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index 9233c72d..b45b2a2f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -90,7 +90,7 @@ class SimResourceBenchmarks {
switch.addInput(SimResourceSource(3000.0, interpreter))
switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -105,7 +105,7 @@ class SimResourceBenchmarks {
repeat(3) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -120,7 +120,7 @@ class SimResourceBenchmarks {
switch.addInput(SimResourceSource(3000.0, interpreter))
switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -135,7 +135,7 @@ class SimResourceBenchmarks {
repeat(2) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index be04d399..5fe7d7bb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -42,7 +42,7 @@ public abstract class SimAbstractResourceAggregator(
/**
* This method is invoked when the resource consumer finishes processing.
*/
- protected abstract fun doFinish(cause: Throwable?)
+ protected abstract fun doFinish()
/**
* This method is invoked when an input context is started.
@@ -54,8 +54,9 @@ public abstract class SimAbstractResourceAggregator(
*/
protected abstract fun onInputFinished(input: Input)
+ /* SimResourceAggregator */
override fun addInput(input: SimResourceProvider) {
- check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+ check(state != SimResourceState.Stopped) { "Aggregator has been stopped" }
val consumer = Consumer()
_inputs.add(input)
@@ -63,44 +64,77 @@ public abstract class SimAbstractResourceAggregator(
input.startConsumer(consumer)
}
- override fun close() {
- output.close()
- }
-
- override val output: SimResourceProvider
- get() = _output
- private val _output = SimResourceForwarder()
-
override val inputs: Set<SimResourceProvider>
get() = _inputs
private val _inputs = mutableSetOf<SimResourceProvider>()
private val _inputConsumers = mutableListOf<Consumer>()
- protected val outputContext: SimResourceContext
- get() = context
- private val context = interpreter.newContext(
- _output,
- object : SimResourceProviderLogic {
- override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
- doIdle(deadline)
- return Long.MAX_VALUE
- }
+ /* SimResourceProvider */
+ override val state: SimResourceState
+ get() = _output.state
- override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
- doConsume(work, limit, deadline)
- return Long.MAX_VALUE
- }
+ override val capacity: Double
+ get() = _output.capacity
- override fun onFinish(ctx: SimResourceControllableContext) {
- doFinish(null)
- }
+ override val speed: Double
+ get() = _output.speed
+
+ override val demand: Double
+ get() = _output.demand
+
+ override val counters: SimResourceCounters
+ get() = _output.counters
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ _output.startConsumer(consumer)
+ }
+
+ override fun cancel() {
+ _output.cancel()
+ }
+
+ override fun interrupt() {
+ _output.interrupt()
+ }
+
+ override fun close() {
+ _output.close()
+ }
+
+ private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) {
+ override fun createLogic(): SimResourceProviderLogic {
+ return object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
+ doIdle(deadline)
+ return Long.MAX_VALUE
+ }
+
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ doConsume(work, limit, deadline)
+ return Long.MAX_VALUE
+ }
+
+ override fun onFinish(ctx: SimResourceControllableContext) {
+ doFinish()
+ }
+
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
- override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- return _inputConsumers.sumOf { it.remainingWork }
+ override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ return _inputConsumers.sumOf { it.remainingWork }
+ }
}
- },
- parent
- )
+ }
+
+ /**
+ * Flush the progress of the output if possible.
+ */
+ fun flush() {
+ ctx?.flush()
+ }
+ }
/**
* An input for the resource aggregator.
@@ -141,7 +175,7 @@ public abstract class SimAbstractResourceAggregator(
private fun updateCapacity() {
// Adjust capacity of output resource
- context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
+ _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
}
/* Input */
@@ -158,7 +192,7 @@ public abstract class SimAbstractResourceAggregator(
this.command = null
next
} else {
- context.flush()
+ _output.flush()
next = command
this.command = null
@@ -172,11 +206,6 @@ public abstract class SimAbstractResourceAggregator(
_ctx = ctx
updateCapacity()
- // Make sure we initialize the output if we have not done so yet
- if (context.state == SimResourceState.Pending) {
- context.start()
- }
-
onInputStarted(this)
}
SimResourceEvent.Capacity -> updateCapacity()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index 519c2615..de26f99e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -22,27 +22,49 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
/**
* Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations.
*/
public abstract class SimAbstractResourceProvider(
private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
+ private val parent: SimResourceSystem?,
initialCapacity: Double
) : SimResourceProvider {
/**
* The capacity of the resource.
*/
- public open var capacity: Double = initialCapacity
- protected set(value) {
+ public override var capacity: Double = initialCapacity
+ set(value) {
field = value
ctx?.capacity = value
}
/**
+ * The current processing speed of the resource.
+ */
+ public override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
+ /**
* The [SimResourceControllableContext] that is currently running.
*/
protected var ctx: SimResourceControllableContext? = null
+ private set
/**
* The state of the resource provider.
@@ -62,6 +84,17 @@ public abstract class SimAbstractResourceProvider(
ctx.start()
}
+ /**
+ * Update the counters of the resource provider.
+ */
+ protected fun updateCounters(ctx: SimResourceContext, work: Double) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += work
+ counters.actual += work - remainingWork
+ counters.overcommit += remainingWork
+ }
+
final override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
val ctx = interpreter.newContext(consumer, createLogic(), parent)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
index 5c0346cd..00972f43 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -25,12 +25,7 @@ package org.opendc.simulator.resources
/**
* A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
*/
-public interface SimResourceAggregator : AutoCloseable {
- /**
- * The output resource provider to which resource consumers can be attached.
- */
- public val output: SimResourceProvider
-
+public interface SimResourceAggregator : SimResourceProvider {
/**
* The input resources that will be switched between the output providers.
*/
@@ -40,9 +35,4 @@ public interface SimResourceAggregator : AutoCloseable {
* Add the specified [input] to the aggregator.
*/
public fun addInput(input: SimResourceProvider)
-
- /**
- * End the lifecycle of the aggregator.
- */
- public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index bdab6def..c39c1aca 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -38,7 +38,7 @@ public class SimResourceAggregatorMaxMin(
// Divide the requests over the available capacity of the input resources fairly
for (input in consumers) {
val inputCapacity = input.ctx.capacity
- val fraction = inputCapacity / outputContext.capacity
+ val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
val grantedWork = fraction * work
@@ -56,7 +56,7 @@ public class SimResourceAggregatorMaxMin(
}
}
- override fun doFinish(cause: Throwable?) {
+ override fun doFinish() {
val iterator = consumers.iterator()
for (input in iterator) {
iterator.remove()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index 7c76c634..0d9a6106 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -45,6 +45,11 @@ public interface SimResourceContext {
public val speed: Double
/**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
* The amount of work still remaining at this instant.
*/
public val remainingWork: Double
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
new file mode 100644
index 00000000..725aa5bc
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * An interface that tracks cumulative counts of the work performed by a resource.
+ */
+public interface SimResourceCounters {
+ /**
+ * The amount of work that resource consumers wanted the resource to perform.
+ */
+ public val demand: Double
+
+ /**
+ * The amount of work performed by the resource.
+ */
+ public val actual: Double
+
+ /**
+ * The amount of work that could not be completed due to overcommitted resources.
+ */
+ public val overcommit: Double
+
+ /**
+ * Reset the resource counters.
+ */
+ public fun reset()
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
index 8dd1bd2b..e0333ff9 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -25,19 +25,14 @@ package org.opendc.simulator.resources
/**
* A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
*/
-public interface SimResourceDistributor : AutoCloseable {
+public interface SimResourceDistributor : SimResourceConsumer {
/**
* The output resource providers to which resource consumers can be attached.
*/
public val outputs: Set<SimResourceProvider>
/**
- * The input resource that will be distributed over the consumers.
+ * Create a new output for the distributor.
*/
- public val input: SimResourceProvider
-
- /**
- * Create a new output for the distributor with the specified [capacity].
- */
- public fun addOutput(capacity: Double): SimResourceProvider
+ public fun newOutput(): SimResourceProvider
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index e99f5eff..be9e89fb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -29,24 +29,22 @@ import kotlin.math.min
* A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
*/
public class SimResourceDistributorMaxMin(
- override val input: SimResourceProvider,
private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem? = null,
- private val listener: Listener? = null
+ private val parent: SimResourceSystem? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
get() = _outputs
private val _outputs = mutableSetOf<Output>()
/**
- * The active outputs.
+ * The resource context of the consumer.
*/
- private val activeOutputs: MutableList<Output> = mutableListOf()
+ private var ctx: SimResourceContext? = null
/**
- * The total speed requested by the output resources.
+ * The active outputs.
*/
- private var totalRequestedSpeed = 0.0
+ private val activeOutputs: MutableList<Output> = mutableListOf()
/**
* The total amount of work requested by the output resources.
@@ -58,145 +56,83 @@ public class SimResourceDistributorMaxMin(
*/
private var totalAllocatedSpeed = 0.0
- /**
- * The total allocated work requested for the output resources.
- */
- private var totalAllocatedWork = 0.0
-
- /**
- * The amount of work that could not be performed due to over-committing resources.
- */
- private var totalOvercommittedWork = 0.0
-
- /**
- * The amount of work that was lost due to interference.
- */
- private var totalInterferedWork = 0.0
-
- /**
- * The timestamp of the last report.
- */
- private var lastReport: Long = Long.MIN_VALUE
-
- /**
- * A flag to indicate that the switch is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private val consumer = object : SimResourceConsumer {
- /**
- * The resource context of the consumer.
- */
- private lateinit var ctx: SimResourceContext
-
- val remainingWork: Double
- get() = ctx.remainingWork
-
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return doNext(ctx.capacity)
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- }
- SimResourceEvent.Exit -> {
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
-
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
-
- output.close()
- }
- }
- else -> {}
- }
- }
+ /* SimResourceDistributor */
+ override fun newOutput(): SimResourceProvider {
+ val provider = Output(ctx?.capacity ?: 0.0)
+ _outputs.add(provider)
+ return provider
}
- /**
- * The total amount of remaining work.
- */
- private val totalRemainingWork: Double
- get() = consumer.remainingWork
-
- init {
- input.startConsumer(consumer)
+ /* SimResourceConsumer */
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
}
- override fun addOutput(capacity: Double): SimResourceProvider {
- check(!isClosed) { "Distributor has been closed" }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ updateCapacity(ctx)
+ }
+ SimResourceEvent.Exit -> {
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
- val provider = Output(capacity)
- _outputs.add(provider)
- return provider
- }
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
- override fun close() {
- if (!isClosed) {
- isClosed = true
- input.cancel()
+ output.close()
+ }
+ }
+ SimResourceEvent.Capacity -> updateCapacity(ctx)
+ else -> {}
}
}
/**
- * Schedule the work over the physical CPUs.
+ * Schedule the work of the outputs.
*/
- private fun doSchedule(capacity: Double): SimResourceCommand {
- // If there is no work yet, mark all inputs as idle.
+ private fun doNext(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark the input as idle.
if (activeOutputs.isEmpty()) {
return SimResourceCommand.Idle()
}
- val maxUsage = capacity
var duration: Double = Double.MAX_VALUE
var deadline: Long = Long.MAX_VALUE
- var availableSpeed = maxUsage
+ var availableSpeed = capacity
var totalRequestedSpeed = 0.0
var totalRequestedWork = 0.0
- // Flush the work of the outputs
- var outputIterator = activeOutputs.listIterator()
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
-
+ // Pull in the work of the outputs
+ val outputIterator = activeOutputs.listIterator()
+ for (output in outputIterator) {
output.pull()
+ // Remove outputs that have finished
if (output.isFinished) {
- // The output consumer has exited, so remove it from the scheduling queue.
outputIterator.remove()
}
}
- // Sort the outputs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
+ // Sort in-place the outputs based on their requested usage.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
activeOutputs.sort()
// Divide the available input capacity fairly across the outputs using max-min fair sharing
- outputIterator = activeOutputs.listIterator()
var remaining = activeOutputs.size
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
+ for (output in activeOutputs) {
val availableShare = availableSpeed / remaining--
when (val command = output.activeCommand) {
is SimResourceCommand.Idle -> {
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
-
output.actualSpeed = 0.0
}
is SimResourceCommand.Consume -> {
val grantedSpeed = min(output.allowedSpeed, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
// Ignore idle computation
@@ -211,7 +147,7 @@ public class SimResourceDistributorMaxMin(
output.actualSpeed = grantedSpeed
availableSpeed -= grantedSpeed
- // The duration that we want to run is that of the shortest request from an output
+ // The duration that we want to run is that of the shortest request of an output
duration = min(duration, command.work / grantedSpeed)
}
SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
@@ -220,67 +156,23 @@ public class SimResourceDistributorMaxMin(
assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
- this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
- this.totalAllocatedSpeed = maxUsage - availableSpeed
- this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration))
+ this.totalAllocatedSpeed = capacity - availableSpeed
+ val totalAllocatedWork = min(
+ totalRequestedWork,
+ totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)
+ )
return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline)
else
SimResourceCommand.Idle(deadline)
}
- /**
- * Obtain the next command to perform.
- */
- private fun doNext(capacity: Double): SimResourceCommand {
- val totalRequestedWork = totalRequestedWork.toLong()
- val totalAllocatedWork = totalAllocatedWork.toLong()
- val totalRemainingWork = totalRemainingWork.toLong()
- val totalRequestedSpeed = totalRequestedSpeed
- val totalAllocatedSpeed = totalAllocatedSpeed
-
- // Force all inputs to re-schedule their work.
- val command = doSchedule(capacity)
-
- val now = interpreter.clock.millis()
- if (lastReport < now) {
- // Report metrics
- listener?.onSliceFinish(
- this,
- totalRequestedWork,
- totalAllocatedWork - totalRemainingWork,
- totalOvercommittedWork.toLong(),
- totalInterferedWork.toLong(),
- totalAllocatedSpeed,
- totalRequestedSpeed
- )
- lastReport = now
+ private fun updateCapacity(ctx: SimResourceContext) {
+ for (output in _outputs) {
+ output.capacity = ctx.capacity
}
-
- totalInterferedWork = 0.0
- totalOvercommittedWork = 0.0
-
- return command
- }
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
}
/**
@@ -322,7 +214,7 @@ public class SimResourceDistributorMaxMin(
interpreter.batch {
ctx.start()
// Interrupt the input to re-schedule the resources
- input.interrupt()
+ this@SimResourceDistributorMaxMin.ctx?.interrupt()
}
}
@@ -338,8 +230,6 @@ public class SimResourceDistributorMaxMin(
/* SimResourceProviderLogic */
override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
- reportOvercommit(ctx.remainingWork)
-
allowedSpeed = 0.0
activeCommand = SimResourceCommand.Idle(deadline)
lastCommandTimestamp = ctx.clock.millis()
@@ -348,8 +238,6 @@ public class SimResourceDistributorMaxMin(
}
override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
- reportOvercommit(ctx.remainingWork)
-
allowedSpeed = ctx.speed
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
lastCommandTimestamp = ctx.clock.millis()
@@ -357,31 +245,25 @@ public class SimResourceDistributorMaxMin(
return Long.MAX_VALUE
}
- override fun onFinish(ctx: SimResourceControllableContext) {
- reportOvercommit(ctx.remainingWork)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+ override fun onFinish(ctx: SimResourceControllableContext) {
activeCommand = SimResourceCommand.Exit
lastCommandTimestamp = ctx.clock.millis()
}
override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
- // Apply performance interference model
- val performanceScore = 1.0
+ val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0
- // Compute the remaining amount of work
return if (work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
+ // Compute the fraction of compute time allocated to the output
val fraction = actualSpeed / totalAllocatedSpeed
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
-
- totalInterferedWork += interferedWork
-
- max(0.0, work - processed)
+ // Compute the work that was actually granted to the output.
+ val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction
+ max(0.0, work - processingAvailable)
} else {
0.0
}
@@ -399,9 +281,5 @@ public class SimResourceDistributorMaxMin(
ctx.flush()
}
}
-
- private fun reportOvercommit(remainingWork: Double) {
- totalOvercommittedWork += remainingWork
- }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 2f567a5e..f709ca17 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -27,7 +27,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
- * A [SimResourceProvider] provides some resource of type [R].
+ * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer].
*/
public interface SimResourceProvider : AutoCloseable {
/**
@@ -36,6 +36,26 @@ public interface SimResourceProvider : AutoCloseable {
public val state: SimResourceState
/**
+ * The resource capacity available at this instant.
+ */
+ public val capacity: Double
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public val speed: Double
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public val counters: SimResourceCounters
+
+ /**
* Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
*
* @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
index 22676984..5231ecf5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -29,7 +29,7 @@ import kotlin.math.max
*/
public interface SimResourceProviderLogic {
/**
- * This method is invoked when the resource will idle until the specified [deadline].
+ * This method is invoked when the resource is reported to idle until the specified [deadline].
*
* @param ctx The context in which the provider runs.
* @param deadline The deadline that was requested by the resource consumer.
@@ -38,8 +38,8 @@ public interface SimResourceProviderLogic {
public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long
/**
- * This method is invoked when the resource will be consumed until the specified [work] was processed or the
- * [deadline] was reached.
+ * This method is invoked when the resource will be consumed until the specified amount of [work] was processed
+ * or [deadline] is reached.
*
* @param ctx The context in which the provider runs.
* @param work The amount of work that was requested by the resource consumer.
@@ -50,6 +50,14 @@ public interface SimResourceProviderLogic {
public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long
/**
+ * This method is invoked when the progress of the resource consumer is materialized.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param work The amount of work that was requested by the resource consumer.
+ */
+ public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {}
+
+ /**
* This method is invoked when the resource consumer has finished.
*/
public fun onFinish(ctx: SimResourceControllableContext)
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index d984d2a5..9f062cc3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -37,21 +37,6 @@ public class SimResourceSource(
private val interpreter: SimResourceInterpreter,
private val parent: SimResourceSystem? = null
) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) {
- /**
- * The current processing speed of the resource.
- */
- public val speed: Double
- get() = ctx?.speed ?: 0.0
-
- /**
- * The capacity of the resource.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- ctx?.capacity = value
- }
-
override fun createLogic(): SimResourceProviderLogic {
return object : SimResourceProviderLogic {
override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
@@ -62,6 +47,10 @@ public class SimResourceSource(
return min(deadline, ctx.clock.millis() + getDuration(work, speed))
}
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+
override fun onFinish(ctx: SimResourceControllableContext) {
cancel()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index 53fec16a..e224285e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable {
public val inputs: Set<SimResourceProvider>
/**
- * Add an output to the switch with the specified [capacity].
+ * The resource counters to track the execution metrics of all switch resources.
*/
- public fun addOutput(capacity: Double): SimResourceProvider
+ public val counters: SimResourceCounters
+
+ /**
+ * Create a new output on the switch.
+ */
+ public fun newOutput(): SimResourceProvider
/**
* Add the specified [input] to the switch.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 7f1bb2b7..2950af80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
override val inputs: Set<SimResourceProvider>
get() = _inputs
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override val counters: SimResourceCounters = object : SimResourceCounters {
+ override val demand: Double
+ get() = _inputs.sumOf { it.counters.demand }
+ override val actual: Double
+ get() = _inputs.sumOf { it.counters.actual }
+ override val overcommit: Double
+ get() = _inputs.sumOf { it.counters.overcommit }
+
+ override fun reset() {
+ for (input in _inputs) {
+ input.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
- val output = Provider(capacity, forwarder)
+ val output = Provider(forwarder)
_outputs += output
return output
}
@@ -84,10 +101,7 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
_inputs.forEach(SimResourceProvider::cancel)
}
- private inner class Provider(
- private val capacity: Double,
- private val forwarder: SimResourceTransformer
- ) : SimResourceProvider by forwarder {
+ private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder {
override fun close() {
// We explicitly do not close the forwarder here in order to re-use it across output resources.
_outputs -= this
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index 61887e34..684a1b52 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -28,16 +28,25 @@ package org.opendc.simulator.resources
*/
public class SimResourceSwitchMaxMin(
interpreter: SimResourceInterpreter,
- parent: SimResourceSystem? = null,
- private val listener: Listener? = null
+ parent: SimResourceSystem? = null
) : SimResourceSwitch {
- private val _outputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
override val outputs: Set<SimResourceProvider>
- get() = _outputs
+ get() = distributor.outputs
- private val _inputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The input resources that will be switched between the output providers.
+ */
override val inputs: Set<SimResourceProvider>
- get() = _inputs
+ get() = aggregator.inputs
+
+ /**
+ * The resource counters to track the execution metrics of all switch resources.
+ */
+ override val counters: SimResourceCounters
+ get() = aggregator.counters
/**
* A flag to indicate that the switch was closed.
@@ -52,32 +61,19 @@ public class SimResourceSwitchMaxMin(
/**
* The distributor to distribute the aggregated resources.
*/
- private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, interpreter, parent,
- object : SimResourceDistributorMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
- }
- )
+ private val distributor = SimResourceDistributorMaxMin(interpreter, parent)
+
+ init {
+ aggregator.startConsumer(distributor)
+ }
/**
- * Add an output to the switch with the specified [capacity].
+ * Add an output to the switch.
*/
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
- val provider = distributor.addOutput(capacity)
- _outputs.add(provider)
- return provider
+ return distributor.newOutput()
}
/**
@@ -92,26 +88,7 @@ public class SimResourceSwitchMaxMin(
override fun close() {
if (!isClosed) {
isClosed = true
- distributor.close()
aggregator.close()
}
}
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 32f3f573..fd3d1230 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
/**
* A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
*
@@ -53,6 +55,19 @@ public class SimResourceTransformer(
override var state: SimResourceState = SimResourceState.Pending
private set
+ override val capacity: Double
+ get() = ctx?.capacity ?: 0.0
+
+ override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
@@ -97,10 +112,15 @@ public class SimResourceTransformer(
start()
}
+ updateCounters(ctx)
+
return if (state == SimResourceState.Stopped) {
SimResourceCommand.Exit
} else if (delegate != null) {
val command = transform(ctx, delegate.onNext(ctx))
+
+ _work = if (command is SimResourceCommand.Consume) command.work else 0.0
+
if (command == SimResourceCommand.Exit) {
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
// reset beforehand the existing state and check whether it has been updated afterwards
@@ -169,6 +189,22 @@ public class SimResourceTransformer(
state = SimResourceState.Pending
}
}
+
+ /**
+ * Counter to track the current submitted work.
+ */
+ private var _work = 0.0
+
+ /**
+ * Update the resource counters for the transformer.
+ */
+ private fun updateCounters(ctx: SimResourceContext) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += _work
+ counters.actual += _work - remainingWork
+ counters.overcommit += remainingWork
+ }
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 0b3f5de1..46c5c63f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -87,6 +87,26 @@ internal class SimResourceContextImpl(
private var _speed = 0.0
/**
+ * The current resource processing demand.
+ */
+ override val demand: Double
+ get() = _limit
+
+ private val counters = object : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
+
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ /**
* The current state of the resource context.
*/
private var _timestamp: Long = Long.MIN_VALUE
@@ -145,7 +165,7 @@ internal class SimResourceContextImpl(
return
}
- doUpdate(clock.millis())
+ interpreter.scheduleSync(this)
}
/**
@@ -206,6 +226,11 @@ internal class SimResourceContextImpl(
val remainingWork = remainingWork
val isConsume = _limit > 0.0
+ // Update the resource counters only if there is some progress
+ if (timestamp > _timestamp) {
+ logic.onUpdate(this, _work)
+ }
+
// We should only continue processing the next command if:
// 1. The resource consumption was finished.
// 2. The resource capacity cannot satisfy the demand.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
new file mode 100644
index 00000000..827019c5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.impl
+
+import org.opendc.simulator.resources.SimResourceCounters
+
+/**
+ * Mutable implementation of the [SimResourceCounters] interface.
+ */
+internal class SimResourceCountersImpl : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
+
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
index d09e1b45..cb0d6160 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
@@ -61,6 +61,11 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
private val futureInvocations = ArrayDeque<Invocation>()
/**
+ * The systems that have been visited during the interpreter cycle.
+ */
+ private val visited = linkedSetOf<SimResourceSystem>()
+
+ /**
* The index in the batch stack.
*/
private var batchIndex = 0
@@ -95,6 +100,30 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
+ * Update the specified [ctx] synchronously.
+ */
+ fun scheduleSync(ctx: SimResourceContextImpl) {
+ ctx.doUpdate(clock.millis())
+
+ if (visited.add(ctx)) {
+ collectAncestors(ctx, visited)
+ }
+
+ // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active interpreter.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
* Schedule the interpreter to run at [timestamp] to update the resource contexts.
*
* This method will override earlier calls to this method for the same [ctx].
@@ -148,7 +177,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
val queue = queue
val futureQueue = futureQueue
val futureInvocations = futureInvocations
- val visited = linkedSetOf<SimResourceSystem>()
+ val visited = visited
// Execute all scheduled updates at current timestamp
while (true) {
@@ -183,6 +212,8 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
for (system in visited) {
system.onConverge(now)
}
+
+ visited.clear()
} while (queue.isNotEmpty())
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index 994ae888..51024e80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -59,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest {
source.startConsumer(adapter)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertAll(
@@ -67,7 +67,7 @@ internal class SimResourceAggregatorMaxMinTest {
{ assertEquals(listOf(0.0, 0.5, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -87,14 +87,14 @@ internal class SimResourceAggregatorMaxMinTest {
val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
try {
- aggregator.output.consume(adapter)
+ aggregator.consume(adapter)
yield()
assertAll(
{ assertEquals(1000, clock.millis()) },
{ assertEquals(listOf(0.0, 2.0, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -115,13 +115,13 @@ internal class SimResourceAggregatorMaxMinTest {
.andThen(SimResourceCommand.Exit)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertEquals(1000, clock.millis())
verify(exactly = 2) { consumer.onNext(any()) }
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -142,11 +142,11 @@ internal class SimResourceAggregatorMaxMinTest {
.andThenThrows(IllegalStateException("Test Exception"))
try {
- assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ assertThrows<IllegalStateException> { aggregator.consume(consumer) }
yield()
assertEquals(SimResourceState.Pending, sources[0].state)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -164,14 +164,14 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(4.0, 1.0)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(1000)
sources[0].capacity = 0.5
}
yield()
assertEquals(2334, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@@ -189,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(1.0, 0.5)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(500)
sources[0].capacity = 0.5
}
yield()
assertEquals(1000, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
+ }
+ }
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
+ val sources = listOf(
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+ assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" }
+ } finally {
+ aggregator.close()
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index 517dcb36..ad8d82e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -67,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest {
source.startConsumer(adapter)
switch.addInput(forwarder)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -98,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -142,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -170,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- switch.addOutput(3200.0)
- assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
+ switch.newOutput()
+ assertThrows<IllegalStateException> { switch.newOutput() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 0b023878..e4292ec0 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -47,7 +47,7 @@ internal class SimResourceSwitchMaxMinTest {
val sources = List(2) { SimResourceSource(2000.0, scheduler) }
sources.forEach { switch.addInput(it) }
- val provider = switch.addOutput(1000.0)
+ val provider = switch.newOutput()
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
@@ -67,26 +67,6 @@ internal class SimResourceSwitchMaxMinTest {
fun testOvercommittedSingle() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workload =
SimTraceConsumer(
@@ -98,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(scheduler, null, listener)
- val provider = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val provider = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -110,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest {
}
assertAll(
- { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
+ { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -124,26 +104,6 @@ internal class SimResourceSwitchMaxMinTest {
fun testOvercommittedDual() = runBlockingSimulation {
val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
-
val duration = 5 * 60L
val workloadA =
SimTraceConsumer(
@@ -164,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(scheduler, null, listener)
- val providerA = switch.addOutput(3200.0)
- val providerB = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val providerA = switch.newOutput()
+ val providerB = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -181,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest {
switch.close()
}
assertAll(
- { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
+ { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index 04886399..810052b8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -206,4 +206,21 @@ internal class SimResourceTransformerTest {
assertEquals(0, clock.millis())
verify(exactly = 1) { consumer.onNext(any()) }
}
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ source.startConsumer(forwarder)
+
+ forwarder.consume(consumer)
+
+ assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
+ assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
+ assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(2000, clock.millis())
+ }
}