summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-03 14:03:12 +0200
committerGitHub <noreply@github.com>2021-06-03 14:03:12 +0200
commit1303fe97510fb7987746722b3261c696f523fbd5 (patch)
treed927dbd4c71a5ea6435f5994e8fa0bc90ef19b2c /opendc-simulator
parentae987fa84b2e061eb9fdfec5561e1c976aaa5a54 (diff)
parentcef12722f03a24a0e1e3b7502fb5e434d93f1664 (diff)
simulator: Improve simulator resource model (#142)
This pull request improves the existing simulator resource model that is at the core of all simulation models in OpenDC. Most importantly, we have changed the way of how metrics are reported by this layer. * Add `SimResourceInterpreter` which is responsible for efficiently scheduling communication between resources in OpenDC. The performance gain is in the 2x-5x range. * Add uniform interface for exposing resource metrics (using `SimResourceCounters`). * Split the CPUFreq subsystem in the compute simulator as it mixed responsibilities of different layers. **Breaking API Changes** * Resource providers now accept a `SimResourceInterpreter` which is responsible for coordinating the communication between resources. * `ScalingGovernor` is not part of the machine layer anymore. Instead, it should move in the OS/Hypervisor layer. * Workloads should now start CPU consumers using `cpu.startConsumer`.
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt28
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt150
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt143
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt110
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt71
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt10
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt)44
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt)13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt)25
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt76
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt12
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt55
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt9
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt43
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt30
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt40
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt (renamed from opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt)19
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt (renamed from opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt)59
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt38
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt115
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt362
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt131
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt)53
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt (renamed from opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt)27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt386
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt99
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt81
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt99
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt74
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt43
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt36
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt422
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt (renamed from opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt)22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt331
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt63
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt93
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt27
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt71
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt32
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt5
58 files changed, 2254 insertions, 1657 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 15714aca..fb753de2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -25,17 +25,15 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceScheduler
-import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
+import org.opendc.simulator.resources.SimResourceInterpreter
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
@@ -46,13 +44,13 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: SimResourceScheduler
+ private lateinit var interpreter: SimResourceInterpreter
private lateinit var machineModel: SimMachineModel
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
+ interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
@@ -85,8 +83,7 @@ class SimMachineBenchmarks {
fun benchmarkBareMetal(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace))
}
@@ -96,10 +93,9 @@ class SimMachineBenchmarks {
fun benchmarkSpaceSharedHypervisor(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
@@ -118,10 +114,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorSingle(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(scheduler)
+ val hypervisor = SimFairShareHypervisor(interpreter)
launch { machine.run(hypervisor) }
@@ -140,10 +135,9 @@ class SimMachineBenchmarks {
fun benchmarkFairShareHypervisorDouble(state: Workload) {
return scope.runBlockingSimulation {
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(scheduler)
+ val hypervisor = SimFairShareHypervisor(interpreter)
launch { machine.run(hypervisor) }
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
index 713376e7..57c25b86 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractHypervisor.kt
@@ -22,21 +22,22 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
-import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.*
-import java.time.Clock
+import org.opendc.simulator.resources.SimResourceSwitch
/**
* Abstract implementation of the [SimHypervisor] interface.
+ *
+ * @param interpreter The resource interpreter to use.
+ * @param scalingGovernor The scaling governor to use for scaling the CPU frequency of the underlying hardware.
*/
-public abstract class SimAbstractHypervisor : SimHypervisor {
+public abstract class SimAbstractHypervisor(
+ private val interpreter: SimResourceInterpreter,
+ private val scalingGovernor: ScalingGovernor?
+) : SimHypervisor {
/**
* The machine on which the hypervisor runs.
*/
@@ -55,6 +56,11 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
get() = _vms
/**
+ * The scaling governors attached to the physical CPUs backing this hypervisor.
+ */
+ private val governors = mutableListOf<ScalingGovernor.Logic>()
+
+ /**
* Construct the [SimResourceSwitch] implementation that performs the actual scheduling of the CPUs.
*/
public abstract fun createSwitch(ctx: SimMachineContext): SimResourceSwitch
@@ -64,6 +70,16 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
*/
public abstract fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean
+ /**
+ * Trigger the governors to recompute the scaling limits.
+ */
+ protected fun triggerGovernors(load: Double) {
+ for (governor in governors) {
+ governor.onLimit(load)
+ }
+ }
+
+ /* SimHypervisor */
override fun canFit(model: SimMachineModel): Boolean {
return canFit(model, switch)
}
@@ -78,6 +94,21 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
return vm
}
+ /* SimWorkload */
+ override fun onStart(ctx: SimMachineContext) {
+ context = ctx
+ switch = createSwitch(ctx)
+
+ for (cpu in ctx.cpus) {
+ val governor = scalingGovernor?.createLogic(cpu)
+ if (governor != null) {
+ governors.add(governor)
+ governor.onStart()
+ }
+ switch.addInput(cpu)
+ }
+ }
+
/**
* A virtual machine running on the hypervisor.
*
@@ -85,105 +116,34 @@ public abstract class SimAbstractHypervisor : SimHypervisor {
* @property performanceInterferenceModel The performance interference model to utilize.
*/
private inner class VirtualMachine(
- override val model: SimMachineModel,
+ model: SimMachineModel,
val performanceInterferenceModel: PerformanceInterferenceModel? = null,
- ) : SimMachine {
- /**
- * A [StateFlow] representing the CPU usage of the simulated machine.
- */
- override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
-
- /**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
+ ) : SimAbstractMachine(interpreter, parent = null, model) {
/**
* The vCPUs of the machine.
*/
- private val cpus = model.cpus.map { ProcessingUnitImpl(it, switch) }
-
- /**
- * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
- */
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
- coroutineScope {
- require(!isTerminated) { "Machine is terminated" }
-
- val ctx = object : SimMachineContext {
- override val cpus: List<SimProcessingUnit> = this@VirtualMachine.cpus
-
- override val memory: List<MemoryUnit>
- get() = model.memory
+ override val cpus = model.cpus.map { VCpu(switch.newOutput(), it) }
- override val clock: Clock
- get() = this@SimAbstractHypervisor.context.clock
-
- override val meta: Map<String, Any> = meta
- }
-
- workload.onStart(ctx)
-
- for (cpu in cpus) {
- launch {
- cpu.consume(workload.getConsumer(ctx, cpu.model))
- }
- }
- }
- }
-
- /**
- * Terminate this VM instance.
- */
override fun close() {
- if (!isTerminated) {
- isTerminated = true
+ super.close()
- cpus.forEach(SimProcessingUnit::close)
- _vms.remove(this)
- }
+ _vms.remove(this)
}
}
- override fun onStart(ctx: SimMachineContext) {
- context = ctx
- switch = createSwitch(ctx)
- }
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- val forwarder = SimResourceForwarder()
- switch.addInput(forwarder)
- return forwarder
- }
-
/**
- * The [SimProcessingUnit] of this machine.
+ * A [SimProcessingUnit] of a virtual machine.
*/
- public inner class ProcessingUnitImpl(override val model: ProcessingUnit, switch: SimResourceSwitch) : SimProcessingUnit {
- /**
- * The actual resource supporting the processing unit.
- */
- private val source = switch.addOutput(model.frequency)
-
- override val speed: Double = 0.0 /* TODO Implement */
-
- override val state: SimResourceState
- get() = source.state
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- source.startConsumer(consumer)
- }
-
- override fun interrupt() {
- source.interrupt()
- }
-
- override fun cancel() {
- source.cancel()
- }
+ private class VCpu(
+ private val source: SimResourceProvider,
+ override val model: ProcessingUnit
+ ) : SimProcessingUnit, SimResourceProvider by source {
+ override var capacity: Double
+ get() = source.capacity
+ set(_) {
+ // Ignore capacity changes
+ }
- override fun close() {
- source.close()
- }
+ override fun toString(): String = "SimAbstractHypervisor.VCpu[model=$model]"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index e501033a..e12ac72b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -27,17 +27,27 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.consume
-import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.*
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
/**
* Abstract implementation of the [SimMachine] interface.
+ *
+ * @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
+ * @param model The model of the machine.
*/
-public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine {
+public abstract class SimAbstractMachine(
+ protected val interpreter: SimResourceInterpreter,
+ final override val parent: SimResourceSystem?,
+ final override val model: SimMachineModel
+) : SimMachine, SimResourceSystem {
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
private val _usage = MutableStateFlow(0.0)
- override val usage: StateFlow<Double>
+ public final override val usage: StateFlow<Double>
get() = _usage
/**
@@ -48,67 +58,76 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
private var _speed = doubleArrayOf()
/**
- * A flag to indicate that the machine is terminated.
- */
- private var isTerminated = false
-
- /**
- * The [CoroutineContext] to run in.
- */
- protected abstract val context: CoroutineContext
-
- /**
* The resources allocated for this machine.
*/
protected abstract val cpus: List<SimProcessingUnit>
/**
- * The execution context in which the workload runs.
+ * A flag to indicate that the machine is terminated.
*/
- private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
- override val clock: Clock
- get() = this@SimAbstractMachine.clock
-
- override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
+ private var isTerminated = false
- override val memory: List<MemoryUnit> = model.memory
- }
+ /**
+ * The continuation to resume when the virtual machine workload has finished.
+ */
+ private var cont: Continuation<Unit>? = null
/**
* Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
*/
- override suspend fun run(workload: SimWorkload, meta: Map<String, Any>): Unit = withContext(context) {
- require(!isTerminated) { "Machine is terminated" }
- val ctx = Context(meta)
- val totalCapacity = model.cpus.sumOf { it.frequency }
+ override suspend fun run(workload: SimWorkload, meta: Map<String, Any>) {
+ check(!isTerminated) { "Machine is terminated" }
+ check(cont == null) { "A machine cannot run concurrently" }
- _speed = DoubleArray(model.cpus.size) { 0.0 }
- var totalSpeed = 0.0
+ val ctx = Context(meta)
// Before the workload starts, initialize the initial power draw
+ _speed = DoubleArray(model.cpus.size) { 0.0 }
updateUsage(0.0)
- workload.onStart(ctx)
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
- for (cpu in cpus) {
- val model = cpu.model
- val consumer = workload.getConsumer(ctx, model)
- val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed ->
- val _speed = _speed
- val _usage = _usage
-
- val oldSpeed = _speed[model.id]
- _speed[model.id] = newSpeed
- totalSpeed = totalSpeed - oldSpeed + newSpeed
-
- val newUsage = totalSpeed / totalCapacity
- if (_usage.value != newUsage) {
- updateUsage(totalSpeed / totalCapacity)
+ // Cancel all cpus on cancellation
+ cont.invokeOnCancellation {
+ this.cont = null
+
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
}
}
- launch { cpu.consume(adapter) }
+ interpreter.batch { workload.onStart(ctx) }
+ }
+ }
+
+ override fun close() {
+ if (isTerminated) {
+ return
+ }
+
+ isTerminated = true
+ cancel()
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.close()
+ }
+ }
+ }
+
+ /* SimResourceSystem */
+ override fun onConverge(timestamp: Long) {
+ val totalCapacity = model.cpus.sumOf { it.frequency }
+ val cpus = cpus
+ var totalSpeed = 0.0
+ for (cpu in cpus) {
+ _speed[cpu.model.id] = cpu.speed
+ totalSpeed += cpu.speed
}
+
+ updateUsage(totalSpeed / totalCapacity)
}
/**
@@ -118,10 +137,34 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
_usage.value = usage
}
- override fun close() {
- if (!isTerminated) {
- isTerminated = true
- cpus.forEach(SimProcessingUnit::close)
+ /**
+ * Cancel the workload that is currently running on the machine.
+ */
+ private fun cancel() {
+ interpreter.batch {
+ for (cpu in cpus) {
+ cpu.cancel()
+ }
+ }
+
+ val cont = cont
+ if (cont != null) {
+ this.cont = null
+ cont.resume(Unit)
}
}
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ private inner class Context(override val meta: Map<String, Any>) : SimMachineContext {
+ override val interpreter: SimResourceInterpreter
+ get() = this@SimAbstractMachine.interpreter
+
+ override val cpus: List<SimProcessingUnit> = this@SimAbstractMachine.cpus
+
+ override val memory: List<MemoryUnit> = model.memory
+
+ override fun close() = cancel()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 27ebba21..5d5d1e5a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -22,62 +22,36 @@
package org.opendc.simulator.compute
-import kotlinx.coroutines.*
-import org.opendc.simulator.compute.cpufreq.ScalingDriver
-import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.resources.*
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
-import kotlin.coroutines.*
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* A simulated bare-metal machine that is able to run a single workload.
*
* A [SimBareMetalMachine] is a stateful object and you should be careful when operating this object concurrently. For
- * example. the class expects only a single concurrent call to [run].
+ * example. The class expects only a single concurrent call to [run].
*
- * @param context The [CoroutineContext] to run the simulated workload in.
- * @param clock The virtual clock to track the simulation time.
+ * @param interpreter The [SimResourceInterpreter] to drive the simulation.
* @param model The machine model to simulate.
+ * @param powerDriver The power driver to use.
+ * @param parent The parent simulation system.
*/
-@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class)
public class SimBareMetalMachine(
- context: CoroutineContext,
- private val clock: Clock,
- override val model: SimMachineModel,
- scalingGovernor: ScalingGovernor,
- scalingDriver: ScalingDriver
-) : SimAbstractMachine(clock) {
- /**
- * The [Job] associated with this machine.
- */
- private val scope = CoroutineScope(context + Job())
-
- override val context: CoroutineContext = scope.coroutineContext
-
- /**
- * The [TimerScheduler] to use for scheduling the interrupts.
- */
- private val scheduler = SimResourceSchedulerTrampoline(this.context, clock)
-
- override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
+ interpreter: SimResourceInterpreter,
+ model: SimMachineModel,
+ powerDriver: PowerDriver,
+ parent: SimResourceSystem? = null,
+) : SimAbstractMachine(interpreter, parent, model) {
+ override val cpus: List<SimProcessingUnit> = model.cpus.map { cpu ->
+ Cpu(SimResourceSource(cpu.frequency, interpreter, this@SimBareMetalMachine), cpu)
+ }
/**
- * Construct the [ScalingDriver.Logic] for this machine.
+ * Construct the [PowerDriver.Logic] for this machine.
*/
- private val scalingDriver = scalingDriver.createLogic(this)
-
- /**
- * The scaling contexts associated with each CPU.
- */
- private val scalingGovernors = cpus.map { cpu ->
- scalingGovernor.createLogic(this.scalingDriver.createContext(cpu))
- }
-
- init {
- scalingGovernors.forEach { it.onStart() }
- }
+ private val powerDriver = powerDriver.createLogic(this, cpus)
/**
* The power draw of the machine.
@@ -88,45 +62,25 @@ public class SimBareMetalMachine(
override fun updateUsage(usage: Double) {
super.updateUsage(usage)
- scalingGovernors.forEach { it.onLimit() }
- powerDraw = scalingDriver.computePower()
- }
-
- override fun close() {
- super.close()
-
- scope.cancel()
+ powerDraw = powerDriver.computePower()
}
/**
- * The [SimProcessingUnit] of this machine.
+ * A [SimProcessingUnit] of a bare-metal machine.
*/
- public inner class ProcessingUnitImpl(override val model: ProcessingUnit) : SimProcessingUnit {
- /**
- * The actual resource supporting the processing unit.
- */
- private val source = SimResourceSource(model.frequency, scheduler)
-
- override val speed: Double
- get() = source.speed
-
- override val state: SimResourceState
- get() = source.state
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- source.startConsumer(consumer)
- }
-
- override fun interrupt() {
- source.interrupt()
- }
-
- override fun cancel() {
- source.cancel()
- }
-
- override fun close() {
- source.interrupt()
- }
+ private class Cpu(
+ private val source: SimResourceSource,
+ override val model: ProcessingUnit
+ ) : SimProcessingUnit, SimResourceProvider by source {
+ override var capacity: Double
+ get() = source.capacity
+ set(value) {
+ // Clamp the capacity of the CPU between [0.0, maxFreq]
+ if (value >= 0.0 && value <= model.frequency) {
+ source.capacity = value
+ }
+ }
+
+ override fun toString(): String = "SimBareMetalMachine.Cpu[model=$model]"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index 11aec2de..e7776c81 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -22,35 +22,72 @@
package org.opendc.simulator.compute
+import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSwitch
+import org.opendc.simulator.resources.SimResourceSwitchMaxMin
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single
* [SimBareMetalMachine] concurrently using weighted fair sharing.
*
+ * @param interpreter The interpreter to manage the machine's resources.
+ * @param parent The parent simulation system.
+ * @param scalingGovernor The CPU frequency scaling governor to use for the hypervisor.
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
+public class SimFairShareHypervisor(
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null,
+ scalingGovernor: ScalingGovernor? = null,
+ private val listener: SimHypervisor.Listener? = null
+) : SimAbstractHypervisor(interpreter, scalingGovernor) {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
- return SimResourceSwitchMaxMin(
- scheduler,
- object : SimResourceSwitchMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimFairShareHypervisor, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
+ return SwitchSystem(ctx).switch
+ }
+
+ private inner class SwitchSystem(private val ctx: SimMachineContext) : SimResourceSystem {
+ val switch = SimResourceSwitchMaxMin(interpreter, this)
+
+ override val parent: SimResourceSystem? = this@SimFairShareHypervisor.parent
+
+ private var lastCpuUsage = 0.0
+ private var lastCpuDemand = 0.0
+ private var lastDemand = 0.0
+ private var lastActual = 0.0
+ private var lastOvercommit = 0.0
+ private var lastReport = Long.MIN_VALUE
+
+ override fun onConverge(timestamp: Long) {
+ val listener = listener ?: return
+ val counters = switch.counters
+
+ if (timestamp > lastReport) {
+ listener.onSliceFinish(
+ this@SimFairShareHypervisor,
+ (counters.demand - lastDemand).toLong(),
+ (counters.actual - lastActual).toLong(),
+ (counters.overcommit - lastOvercommit).toLong(),
+ 0L,
+ lastCpuUsage,
+ lastCpuDemand
+ )
}
- )
+ lastReport = timestamp
+
+ lastCpuDemand = switch.inputs.sumOf { it.demand }
+ lastCpuUsage = switch.inputs.sumOf { it.speed }
+ lastDemand = counters.demand
+ lastActual = counters.actual
+ lastOvercommit = counters.overcommit
+
+ val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency }
+ triggerGovernors(load)
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 2ab3ea09..94c905b2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -22,9 +22,8 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
@@ -32,7 +31,9 @@ import kotlin.coroutines.CoroutineContext
public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
- override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener)
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimFairShareHypervisor(interpreter, parent, listener = listener)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
index b66020f4..8e8c3698 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A service provider interface for constructing a [SimHypervisor].
@@ -40,5 +40,9 @@ public interface SimHypervisorProvider {
/**
* Create a [SimHypervisor] instance with the specified [listener].
*/
- public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor
+ public fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null,
+ listener: SimHypervisor.Listener? = null
+ ): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
index c2523a2a..5cbabc86 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachineContext.kt
@@ -23,18 +23,18 @@
package org.opendc.simulator.compute
import org.opendc.simulator.compute.model.MemoryUnit
-import java.time.Clock
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* A simulated execution context in which a bootable image runs. This interface represents the
* firmware interface between the running image (e.g. operating system) and the physical or virtual firmware on
* which the image runs.
*/
-public interface SimMachineContext {
+public interface SimMachineContext : AutoCloseable {
/**
- * The virtual clock tracking simulation time.
+ * The resource interpreter that simulates the machine.
*/
- public val clock: Clock
+ public val interpreter: SimResourceInterpreter
/**
* The metadata associated with the context.
@@ -50,4 +50,9 @@ public interface SimMachineContext {
* The memory available on the machine
*/
public val memory: List<MemoryUnit>
+
+ /**
+ * Stop the workload.
+ */
+ public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
index 13c7d9b2..93c9ddfa 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimProcessingUnit.kt
@@ -30,12 +30,12 @@ import org.opendc.simulator.resources.SimResourceProvider
*/
public interface SimProcessingUnit : SimResourceProvider {
/**
- * The model representing the static properties of the processing unit.
+ * The capacity of the processing unit, which can be adjusted by the workload if supported by the machine.
*/
- public val model: ProcessingUnit
+ public override var capacity: Double
/**
- * The current speed of the processing unit.
+ * The model representing the static properties of the processing unit.
*/
- public val speed: Double
+ public val model: ProcessingUnit
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
index fd8e546f..f6ae18f7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -22,12 +22,14 @@
package org.opendc.simulator.compute
-import org.opendc.simulator.resources.*
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSwitch
+import org.opendc.simulator.resources.SimResourceSwitchExclusive
/**
* A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
*/
-public class SimSpaceSharedHypervisor : SimAbstractHypervisor() {
+public class SimSpaceSharedHypervisor(interpreter: SimResourceInterpreter) : SimAbstractHypervisor(interpreter, null) {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean {
return switch.inputs.size - switch.outputs.size >= model.cpus.size
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index 83b924d7..923b5bab 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -22,8 +22,8 @@
package org.opendc.simulator.compute
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
+import org.opendc.simulator.resources.SimResourceInterpreter
+import org.opendc.simulator.resources.SimResourceSystem
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
@@ -31,7 +31,9 @@ import kotlin.coroutines.CoroutineContext
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
- return SimSpaceSharedHypervisor()
- }
+ override fun create(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?,
+ listener: SimHypervisor.Listener?
+ ): SimHypervisor = SimSpaceSharedHypervisor(interpreter)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt
index 96f8775a..245877be 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernor.kt
@@ -22,13 +22,15 @@
package org.opendc.simulator.compute.cpufreq
+import org.opendc.simulator.compute.SimProcessingUnit
+
/**
* A CPUFreq [ScalingGovernor] that causes the highest possible frequency to be requested from the resource.
*/
public class PerformanceScalingGovernor : ScalingGovernor {
- override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
- override fun onLimit() {
- ctx.setTarget(ctx.cpu.model.frequency)
+ override fun createLogic(cpu: SimProcessingUnit): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
+ override fun onStart() {
+ cpu.capacity = cpu.model.frequency
}
override fun toString(): String = "PerformanceScalingGovernor.Logic"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt
index c9aea580..b7e7ffc6 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingGovernor.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute.cpufreq
+import org.opendc.simulator.compute.SimProcessingUnit
+
/**
* A [ScalingGovernor] in the CPUFreq subsystem of OpenDC is responsible for scaling the frequency of simulated CPUs
* independent of the particular implementation of the CPU.
@@ -33,9 +35,9 @@ package org.opendc.simulator.compute.cpufreq
*/
public interface ScalingGovernor {
/**
- * Create the scaling logic for the specified [context]
+ * Create the scaling logic for the specified [cpu]
*/
- public fun createLogic(ctx: ScalingContext): Logic
+ public fun createLogic(cpu: SimProcessingUnit): Logic
/**
* The logic of the scaling governor.
@@ -48,7 +50,9 @@ public interface ScalingGovernor {
/**
* This method is invoked when the governor should re-decide the frequency limits.
+ *
+ * @param load The load of the system.
*/
- public fun onLimit() {}
+ public fun onLimit(load: Double) {}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
index 6f44d778..6328c8e4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PStatePowerDriver.kt
@@ -20,67 +20,41 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimProcessingUnit
-import org.opendc.simulator.compute.power.PowerModel
import java.util.*
import kotlin.math.max
import kotlin.math.min
/**
- * A [ScalingDriver] that scales the frequency of the processor based on a discrete set of frequencies.
+ * A [PowerDriver] that computes the power draw using multiple [PowerModel]s based on multiple frequency states.
*
* @param states A map describing the states of the driver.
*/
-public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDriver {
+public class PStatePowerDriver(states: Map<Double, PowerModel>) : PowerDriver {
/**
* The P-States defined by the user and ordered by key.
*/
private val states = TreeMap(states)
- override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic {
- /**
- * The scaling contexts.
- */
- private val contexts = mutableListOf<ScalingContextImpl>()
-
- override fun createContext(cpu: SimProcessingUnit): ScalingContext {
- val ctx = ScalingContextImpl(machine, cpu)
- contexts.add(ctx)
- return ctx
- }
-
+ override fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): PowerDriver.Logic = object : PowerDriver.Logic {
override fun computePower(): Double {
var targetFreq = 0.0
var totalSpeed = 0.0
- for (ctx in contexts) {
- targetFreq = max(ctx.target, targetFreq)
- totalSpeed += ctx.cpu.speed
+ for (cpu in cpus) {
+ targetFreq = max(cpu.capacity, targetFreq)
+ totalSpeed += cpu.speed
}
val maxFreq = states.lastKey()
val (actualFreq, model) = states.ceilingEntry(min(maxFreq, targetFreq))
- val utilization = totalSpeed / (actualFreq * contexts.size)
+ val utilization = totalSpeed / (actualFreq * cpus.size)
return model.computePower(utilization)
}
- override fun toString(): String = "PStateScalingDriver.Logic"
- }
-
- private class ScalingContextImpl(
- override val machine: SimMachine,
- override val cpu: SimProcessingUnit
- ) : ScalingContext {
- var target = cpu.model.frequency
- private set
-
- override fun setTarget(freq: Double) {
- target = freq
- }
-
- override fun toString(): String = "PStateScalingDriver.Context"
+ override fun toString(): String = "PStatePowerDriver.Logic"
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt
index b4fd7550..a1a2b911 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingDriver.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/PowerDriver.kt
@@ -20,30 +20,25 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimProcessingUnit
/**
- * A [ScalingDriver] is responsible for switching the processor to the correct frequency.
+ * A [PowerDriver] is responsible for switching the processor to the correct frequency.
*/
-public interface ScalingDriver {
+public interface PowerDriver {
/**
* Create the scaling logic for the specified [machine]
*/
- public fun createLogic(machine: SimMachine): Logic
+ public fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): Logic
/**
* The logic of the scaling driver.
*/
public interface Logic {
/**
- * Create the [ScalingContext] for the specified [cpu] instance.
- */
- public fun createContext(cpu: SimProcessingUnit): ScalingContext
-
- /**
* Compute the power consumption of the processor.
*
* @return The power consumption of the processor in W.
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
index 18338079..5c5ceff5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/ScalingContext.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/power/SimplePowerDriver.kt
@@ -20,27 +20,20 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import org.opendc.simulator.compute.SimMachine
import org.opendc.simulator.compute.SimProcessingUnit
/**
- * A [ScalingContext] is used to communicate frequency scaling changes between the [ScalingGovernor] and driver.
+ * A [PowerDriver] that computes the power consumption based on a single specified [power model][model].
*/
-public interface ScalingContext {
- /**
- * The machine the processing unit belongs to.
- */
- public val machine: SimMachine
+public class SimplePowerDriver(private val model: PowerModel) : PowerDriver {
+ override fun createLogic(machine: SimMachine, cpus: List<SimProcessingUnit>): PowerDriver.Logic = object : PowerDriver.Logic {
+ override fun computePower(): Double {
+ return model.computePower(machine.usage.value)
+ }
- /**
- * The processing unit associated with this context.
- */
- public val cpu: SimProcessingUnit
-
- /**
- * Target the processor to run at the specified target [frequency][freq].
- */
- public fun setTarget(freq: Double)
+ override fun toString(): String = "SimplePowerDriver.Logic"
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt
new file mode 100644
index 00000000..43662d93
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/util/SimWorkloadLifecycle.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.util
+
+import org.opendc.simulator.compute.SimMachineContext
+import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.SimResourceEvent
+
+/**
+ * A helper class to manage the lifecycle of a [SimWorkload]
+ */
+public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
+ /**
+ * The resource consumers which represent the lifecycle of the workload.
+ */
+ private val waiting = mutableSetOf<SimResourceConsumer>()
+
+ /**
+ * Wait for the specified [consumer] to complete before ending the lifecycle of the workload.
+ */
+ public fun waitFor(consumer: SimResourceConsumer): SimResourceConsumer {
+ waiting.add(consumer)
+ return object : SimResourceConsumer by consumer {
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ try {
+ consumer.onEvent(ctx, event)
+ } finally {
+ if (event == SimResourceEvent.Exit) {
+ complete(consumer)
+ }
+ }
+ }
+
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ try {
+ consumer.onFailure(ctx, cause)
+ } finally {
+ complete(consumer)
+ }
+ }
+
+ override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]"
+ }
+ }
+
+ /**
+ * Complete the specified [SimResourceConsumer].
+ */
+ private fun complete(consumer: SimResourceConsumer) {
+ if (waiting.remove(consumer) && waiting.isEmpty()) {
+ ctx.close()
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
index 63c9d28c..de6832ca 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt
@@ -23,8 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
@@ -43,10 +42,11 @@ public class SimFlopsWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimMachineContext) {}
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- return SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer(flops.toDouble() / ctx.cpus.size, utilization)))
+ }
}
override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
index a3420e32..318a6b49 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt
@@ -23,8 +23,7 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.consumer.SimWorkConsumer
/**
@@ -42,11 +41,12 @@ public class SimRuntimeWorkload(
require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
}
- override fun onStart(ctx: SimMachineContext) {}
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- val limit = cpu.frequency * utilization
- return SimWorkConsumer((limit / 1000) * duration, utilization)
+ override fun onStart(ctx: SimMachineContext) {
+ val lifecycle = SimWorkloadLifecycle(ctx)
+ for (cpu in ctx.cpus) {
+ val limit = cpu.capacity * utilization
+ cpu.startConsumer(lifecycle.waitFor(SimWorkConsumer((limit / 1000) * duration, utilization)))
+ }
}
override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)"
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index ffb332d1..6929f4d2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -24,6 +24,7 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.util.SimWorkloadLifecycle
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -44,33 +45,12 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
barrier = SimConsumerBarrier(ctx.cpus.size)
fragment = nextFragment()
- offset = ctx.clock.millis()
- }
-
- override fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer {
- return object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val now = ctx.clock.millis()
- val fragment = fragment ?: return SimResourceCommand.Exit
- val usage = fragment.usage / fragment.cores
- val work = (fragment.duration / 1000) * usage
- val deadline = offset + fragment.duration
-
- assert(deadline >= now) { "Deadline already passed" }
-
- val cmd =
- if (cpu.id < fragment.cores && work > 0.0)
- SimResourceCommand.Consume(work, usage, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ offset = ctx.interpreter.clock.millis()
- if (barrier.enter()) {
- this@SimTraceWorkload.fragment = nextFragment()
- this@SimTraceWorkload.offset += fragment.duration
- }
+ val lifecycle = SimWorkloadLifecycle(ctx)
- return cmd
- }
+ for (cpu in ctx.cpus) {
+ cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model)))
}
}
@@ -87,6 +67,31 @@ public class SimTraceWorkload(public val trace: Sequence<Fragment>) : SimWorkloa
}
}
+ private inner class Consumer(val cpu: ProcessingUnit) : SimResourceConsumer {
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ val now = ctx.clock.millis()
+ val fragment = fragment ?: return SimResourceCommand.Exit
+ val usage = fragment.usage / fragment.cores
+ val work = (fragment.duration / 1000) * usage
+ val deadline = offset + fragment.duration
+
+ assert(deadline >= now) { "Deadline already passed" }
+
+ val cmd =
+ if (cpu.id < fragment.cores && work > 0.0)
+ SimResourceCommand.Consume(work, usage, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+
+ if (barrier.enter()) {
+ this@SimTraceWorkload.fragment = nextFragment()
+ this@SimTraceWorkload.offset += fragment.duration
+ }
+
+ return cmd
+ }
+ }
+
/**
* A fragment of the workload.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
index bdc12bb5..b80665fa 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt
@@ -23,8 +23,6 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.resources.SimResourceConsumer
/**
* A model that characterizes the runtime behavior of some particular workload.
@@ -35,11 +33,8 @@ import org.opendc.simulator.resources.SimResourceConsumer
public interface SimWorkload {
/**
* This method is invoked when the workload is started.
+ *
+ * @param ctx The execution context in which the machine runs.
*/
public fun onStart(ctx: SimMachineContext)
-
- /**
- * Obtain the resource consumer for the specified processing unit.
- */
- public fun getConsumer(ctx: SimMachineContext, cpu: ProcessingUnit): SimResourceConsumer
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index 8886caa7..b15692ec 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -31,15 +31,16 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertDoesNotThrow
import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* Test suite for the [SimHypervisor] class.
@@ -93,8 +94,9 @@ internal class SimHypervisorTest {
),
)
- val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
+ val platform = SimResourceInterpreter(coroutineContext, clock)
+ val machine = SimBareMetalMachine(platform, model, SimplePowerDriver(ConstantPowerModel(0.0)))
+ val hypervisor = SimFairShareHypervisor(platform, scalingGovernor = PerformanceScalingGovernor(), listener = listener)
launch {
machine.run(hypervisor)
@@ -164,11 +166,11 @@ internal class SimHypervisorTest {
)
)
+ val platform = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, model, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
+ val hypervisor = SimFairShareHypervisor(platform, listener = listener)
launch {
machine.run(hypervisor)
@@ -190,10 +192,33 @@ internal class SimHypervisorTest {
yield()
assertAll(
- { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
+ { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") },
+ { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") },
{ assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
+
+ @Test
+ fun testMultipleCPUs() = runBlockingSimulation {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+ val model = SimMachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+
+ val platform = SimResourceInterpreter(coroutineContext, clock)
+ val machine = SimBareMetalMachine(
+ platform, model, SimplePowerDriver(ConstantPowerModel(0.0))
+ )
+ val hypervisor = SimFairShareHypervisor(platform)
+
+ assertDoesNotThrow {
+ launch {
+ machine.run(hypervisor)
+ }
+ }
+
+ machine.close()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 205f2eca..69f562d2 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -29,14 +29,14 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* Test suite for the [SimBareMetalMachine] class.
@@ -57,7 +57,11 @@ class SimMachineTest {
@Test
fun testFlopsWorkload() = runBlockingSimulation {
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
try {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
@@ -76,7 +80,11 @@ class SimMachineTest {
cpus = List(cpuNode.coreCount * 2) { ProcessingUnit(cpuNode, it % 2, 1000.0) },
memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
)
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
try {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
@@ -90,7 +98,11 @@ class SimMachineTest {
@Test
fun testUsage() = runBlockingSimulation {
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
val res = mutableListOf<Double>()
val job = launch { machine.usage.toList(res) }
@@ -99,7 +111,7 @@ class SimMachineTest {
machine.run(SimFlopsWorkload(2_000, utilization = 1.0))
yield()
job.cancel()
- assertEquals(listOf(0.0, 0.5, 1.0, 0.5, 0.0), res) { "Machine is fully utilized" }
+ assertEquals(listOf(0.0, 1.0, 0.0), res) { "Machine is fully utilized" }
} finally {
machine.close()
}
@@ -107,7 +119,11 @@ class SimMachineTest {
@Test
fun testClose() = runBlockingSimulation {
- val machine = SimBareMetalMachine(coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
+ val machine = SimBareMetalMachine(
+ SimResourceInterpreter(coroutineContext, clock),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0))
+ )
machine.close()
assertDoesNotThrow { machine.close() }
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
index ef6f536d..dba3e9a1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -30,16 +30,16 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.compute.cpufreq.PerformanceScalingGovernor
-import org.opendc.simulator.compute.cpufreq.SimpleScalingDriver
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceInterpreter
/**
* A test suite for the [SimSpaceSharedHypervisor].
@@ -76,11 +76,11 @@ internal class SimSpaceSharedHypervisorTest {
),
)
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ SimResourceInterpreter(coroutineContext, clock), machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
val colA = launch { machine.usage.toList(usagePm) }
launch { machine.run(hypervisor) }
@@ -112,11 +112,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testRuntimeWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimRuntimeWorkload(duration)
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -135,11 +135,11 @@ internal class SimSpaceSharedHypervisorTest {
fun testFlopsWorkload() = runBlockingSimulation {
val duration = 5 * 60L * 1000
val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0)
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -156,11 +156,11 @@ internal class SimSpaceSharedHypervisorTest {
@Test
fun testTwoWorkloads() = runBlockingSimulation {
val duration = 5 * 60L * 1000
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -182,11 +182,11 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
@@ -206,11 +206,11 @@ internal class SimSpaceSharedHypervisorTest {
*/
@Test
fun testConcurrentWorkloadSucceeds() = runBlockingSimulation {
+ val interpreter = SimResourceInterpreter(coroutineContext, clock)
val machine = SimBareMetalMachine(
- coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
- SimpleScalingDriver(ConstantPowerModel(0.0))
+ interpreter, machineModel, SimplePowerDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimSpaceSharedHypervisor()
+ val hypervisor = SimSpaceSharedHypervisor(interpreter)
launch { machine.run(hypervisor) }
yield()
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt
index c482d348..8e8b09c8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PerformanceScalingGovernorTest.kt
@@ -26,23 +26,24 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.Test
+import org.opendc.simulator.compute.SimProcessingUnit
/**
- * Test suite for the [DemandScalingGovernor]
+ * Test suite for the [PerformanceScalingGovernor]
*/
-internal class DemandScalingGovernorTest {
+internal class PerformanceScalingGovernorTest {
@Test
- fun testSetDemandLimit() {
- val ctx = mockk<ScalingContext>(relaxUnitFun = true)
+ fun testSetStartLimit() {
+ val cpu = mockk<SimProcessingUnit>(relaxUnitFun = true)
- every { ctx.cpu.speed } returns 2100.0
+ every { cpu.model.frequency } returns 4100.0
+ every { cpu.speed } returns 2100.0
- val logic = DemandScalingGovernor().createLogic(ctx)
+ val logic = PerformanceScalingGovernor().createLogic(cpu)
logic.onStart()
- verify(exactly = 0) { ctx.setTarget(any()) }
+ logic.onLimit(1.0)
- logic.onLimit()
- verify(exactly = 1) { ctx.setTarget(2100.0) }
+ verify(exactly = 1) { cpu.capacity = 4100.0 }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
index bbea3ee2..35fd7c4c 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/power/PStatePowerDriverTest.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.compute.power
import io.mockk.every
import io.mockk.mockk
@@ -28,18 +28,16 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.compute.SimBareMetalMachine
import org.opendc.simulator.compute.SimProcessingUnit
-import org.opendc.simulator.compute.power.ConstantPowerModel
-import org.opendc.simulator.compute.power.LinearPowerModel
/**
- * Test suite for [PStateScalingDriver].
+ * Test suite for [PStatePowerDriver].
*/
-internal class PStateScalingDriverTest {
+internal class PStatePowerDriverTest {
@Test
- fun testPowerWithoutGovernor() {
+ fun testPowerBaseline() {
val machine = mockk<SimBareMetalMachine>()
- val driver = PStateScalingDriver(
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to ConstantPowerModel(200.0),
3300.0 to ConstantPowerModel(300.0),
@@ -47,19 +45,19 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
+ val logic = driver.createLogic(machine, emptyList())
assertEquals(200.0, logic.computePower())
}
@Test
- fun testPowerWithSingleGovernor() {
+ fun testPowerWithSingleCpu() {
val machine = mockk<SimBareMetalMachine>()
val cpu = mockk<SimProcessingUnit>()
- every { cpu.model.frequency } returns 4100.0
+ every { cpu.capacity } returns 3200.0
every { cpu.speed } returns 1200.0
- val driver = PStateScalingDriver(
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to ConstantPowerModel(200.0),
3300.0 to ConstantPowerModel(300.0),
@@ -67,23 +65,26 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
-
- val scalingContext = logic.createContext(cpu)
- scalingContext.setTarget(3200.0)
+ val logic = driver.createLogic(machine, listOf(cpu))
assertEquals(300.0, logic.computePower())
}
@Test
- fun testPowerWithMultipleGovernors() {
+ fun testPowerWithMultipleCpus() {
val machine = mockk<SimBareMetalMachine>()
- val cpu = mockk<SimProcessingUnit>()
+ val cpus = listOf(
+ mockk<SimProcessingUnit>(),
+ mockk()
+ )
- every { cpu.model.frequency } returns 4100.0
- every { cpu.speed } returns 1200.0
+ every { cpus[0].capacity } returns 1000.0
+ every { cpus[0].speed } returns 1200.0
- val driver = PStateScalingDriver(
+ every { cpus[1].capacity } returns 3500.0
+ every { cpus[1].speed } returns 1200.0
+
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to ConstantPowerModel(200.0),
3300.0 to ConstantPowerModel(300.0),
@@ -91,13 +92,7 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
-
- val scalingContextA = logic.createContext(cpu)
- scalingContextA.setTarget(1000.0)
-
- val scalingContextB = logic.createContext(cpu)
- scalingContextB.setTarget(3400.0)
+ val logic = driver.createLogic(machine, cpus)
assertEquals(350.0, logic.computePower())
}
@@ -109,7 +104,7 @@ internal class PStateScalingDriverTest {
every { cpu.model.frequency } returns 4200.0
- val driver = PStateScalingDriver(
+ val driver = PStatePowerDriver(
sortedMapOf(
2800.0 to LinearPowerModel(200.0, 100.0),
3300.0 to LinearPowerModel(250.0, 150.0),
@@ -117,16 +112,14 @@ internal class PStateScalingDriverTest {
)
)
- val logic = driver.createLogic(machine)
-
- val scalingContext = logic.createContext(cpu)
+ val logic = driver.createLogic(machine, listOf(cpu))
every { cpu.speed } returns 1400.0
- scalingContext.setTarget(1400.0)
+ every { cpu.capacity } returns 1400.0
assertEquals(150.0, logic.computePower())
every { cpu.speed } returns 1400.0
- scalingContext.setTarget(4000.0)
+ every { cpu.capacity } returns 4000.0
assertEquals(235.0, logic.computePower())
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index cd5f33bd..b45b2a2f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: SimResourceScheduler
+ private lateinit var interpreter: SimResourceInterpreter
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
+ interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock)
}
@State(Scope.Thread)
@@ -67,7 +67,7 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkSource(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, scheduler)
+ val provider = SimResourceSource(4200.0, interpreter)
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -75,7 +75,7 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, scheduler)
+ val provider = SimResourceSource(4200.0, interpreter)
val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace))
@@ -85,12 +85,12 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(scheduler)
+ val switch = SimResourceSwitchMaxMin(interpreter)
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -98,14 +98,14 @@ class SimResourceBenchmarks {
@Benchmark
fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(scheduler)
+ val switch = SimResourceSwitchMaxMin(interpreter)
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
- repeat(3) { i ->
+ repeat(3) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -117,10 +117,10 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -130,12 +130,12 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, scheduler))
- switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
+ switch.addInput(SimResourceSource(3000.0, interpreter))
repeat(2) {
launch {
- val provider = switch.addOutput(3500.0)
+ val provider = switch.newOutput()
provider.consume(SimTraceConsumer(state.trace))
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 653b53e0..5fe7d7bb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -25,7 +25,10 @@ package org.opendc.simulator.resources
/**
* Abstract implementation of [SimResourceAggregator].
*/
-public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator {
+public abstract class SimAbstractResourceAggregator(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem?
+) : SimResourceAggregator {
/**
* This method is invoked when the resource consumer consumes resources.
*/
@@ -39,7 +42,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
/**
* This method is invoked when the resource consumer finishes processing.
*/
- protected abstract fun doFinish(cause: Throwable?)
+ protected abstract fun doFinish()
/**
* This method is invoked when an input context is started.
@@ -51,8 +54,9 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
*/
protected abstract fun onInputFinished(input: Input)
+ /* SimResourceAggregator */
override fun addInput(input: SimResourceProvider) {
- check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
+ check(state != SimResourceState.Stopped) { "Aggregator has been stopped" }
val consumer = Consumer()
_inputs.add(input)
@@ -60,42 +64,75 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
input.startConsumer(consumer)
}
- override fun close() {
- output.close()
- }
-
- override val output: SimResourceProvider
- get() = _output
- private val _output = SimResourceForwarder()
-
override val inputs: Set<SimResourceProvider>
get() = _inputs
private val _inputs = mutableSetOf<SimResourceProvider>()
private val _inputConsumers = mutableListOf<Consumer>()
- protected val outputContext: SimResourceContext
- get() = context
- private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) {
- override val remainingWork: Double
- get() {
- val now = clock.millis()
-
- return if (_remainingWorkFlush < now) {
- _remainingWorkFlush = now
- _inputConsumers.sumOf { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it }
- } else {
- _remainingWork
+ /* SimResourceProvider */
+ override val state: SimResourceState
+ get() = _output.state
+
+ override val capacity: Double
+ get() = _output.capacity
+
+ override val speed: Double
+ get() = _output.speed
+
+ override val demand: Double
+ get() = _output.demand
+
+ override val counters: SimResourceCounters
+ get() = _output.counters
+
+ override fun startConsumer(consumer: SimResourceConsumer) {
+ _output.startConsumer(consumer)
+ }
+
+ override fun cancel() {
+ _output.cancel()
+ }
+
+ override fun interrupt() {
+ _output.interrupt()
+ }
+
+ override fun close() {
+ _output.close()
+ }
+
+ private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) {
+ override fun createLogic(): SimResourceProviderLogic {
+ return object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
+ doIdle(deadline)
+ return Long.MAX_VALUE
}
- }
- private var _remainingWork: Double = 0.0
- private var _remainingWorkFlush: Long = Long.MIN_VALUE
- override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ doConsume(work, limit, deadline)
+ return Long.MAX_VALUE
+ }
- override fun onIdle(deadline: Long) = doIdle(deadline)
+ override fun onFinish(ctx: SimResourceControllableContext) {
+ doFinish()
+ }
- override fun onFinish() {
- doFinish(null)
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+
+ override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ return _inputConsumers.sumOf { it.remainingWork }
+ }
+ }
+ }
+
+ /**
+ * Flush the progress of the output if possible.
+ */
+ fun flush() {
+ ctx?.flush()
}
}
@@ -123,7 +160,13 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
*/
override val ctx: SimResourceContext
get() = _ctx!!
- var _ctx: SimResourceContext? = null
+ private var _ctx: SimResourceContext? = null
+
+ /**
+ * The remaining work of the consumer.
+ */
+ val remainingWork: Double
+ get() = _ctx?.remainingWork ?: 0.0
/**
* The resource command to run next.
@@ -132,7 +175,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
private fun updateCapacity() {
// Adjust capacity of output resource
- context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
+ _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
}
/* Input */
@@ -149,7 +192,8 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
this.command = null
next
} else {
- context.flush(isIntermediate = true)
+ _output.flush()
+
next = command
this.command = null
next ?: SimResourceCommand.Idle()
@@ -162,11 +206,6 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe
_ctx = ctx
updateCapacity()
- // Make sure we initialize the output if we have not done so yet
- if (context.state == SimResourceState.Pending) {
- context.start()
- }
-
onInputStarted(this)
}
SimResourceEvent.Capacity -> updateCapacity()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
deleted file mode 100644
index c03bfad5..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import java.time.Clock
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
- */
-public abstract class SimAbstractResourceContext(
- initialCapacity: Double,
- private val scheduler: SimResourceScheduler,
- private val consumer: SimResourceConsumer
-) : SimResourceContext, SimResourceFlushable {
-
- /**
- * The clock of the context.
- */
- public override val clock: Clock
- get() = scheduler.clock
-
- /**
- * The capacity of the resource.
- */
- public final override var capacity: Double = initialCapacity
- set(value) {
- val oldValue = field
-
- // Only changes will be propagated
- if (value != oldValue) {
- field = value
- onCapacityChange()
- }
- }
-
- /**
- * The amount of work still remaining at this instant.
- */
- override val remainingWork: Double
- get() {
- val activeCommand = activeCommand ?: return 0.0
- val now = clock.millis()
-
- return if (_remainingWorkFlush < now) {
- _remainingWorkFlush = now
- computeRemainingWork(activeCommand, now).also { _remainingWork = it }
- } else {
- _remainingWork
- }
- }
- private var _remainingWork: Double = 0.0
- private var _remainingWorkFlush: Long = Long.MIN_VALUE
-
- /**
- * A flag to indicate the state of the context.
- */
- public var state: SimResourceState = SimResourceState.Pending
- private set
-
- /**
- * The current processing speed of the resource.
- */
- final override var speed: Double = 0.0
- private set
-
- /**
- * This method is invoked when the resource will idle until the specified [deadline].
- */
- public abstract fun onIdle(deadline: Long)
-
- /**
- * This method is invoked when the resource will be consumed until the specified [work] was processed or the
- * [deadline] was reached.
- */
- public abstract fun onConsume(work: Double, limit: Double, deadline: Long)
-
- /**
- * This method is invoked when the resource consumer has finished.
- */
- public abstract fun onFinish()
-
- /**
- * Get the remaining work to process after a resource consumption.
- *
- * @param work The size of the resource consumption.
- * @param speed The speed of consumption.
- * @param duration The duration from the start of the consumption until now.
- * @return The amount of work remaining.
- */
- protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
- return if (duration > 0L) {
- val processed = duration / 1000.0 * speed
- max(0.0, work - processed)
- } else {
- 0.0
- }
- }
-
- /**
- * Start the consumer.
- */
- public fun start() {
- check(state == SimResourceState.Pending) { "Consumer is already started" }
-
- val now = clock.millis()
-
- state = SimResourceState.Active
- isProcessing = true
- latestFlush = now
-
- try {
- consumer.onEvent(this, SimResourceEvent.Start)
- activeCommand = interpret(consumer.onNext(this), now)
- } catch (cause: Throwable) {
- doFail(cause)
- } finally {
- isProcessing = false
- }
- }
-
- /**
- * Immediately stop the consumer.
- */
- public fun stop() {
- try {
- isProcessing = true
- latestFlush = clock.millis()
-
- flush(isIntermediate = true)
- doStop()
- } finally {
- isProcessing = false
- }
- }
-
- override fun flush(isIntermediate: Boolean) {
- // Flush is no-op when the consumer is finished or not yet started
- if (state != SimResourceState.Active) {
- return
- }
-
- val now = clock.millis()
-
- // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it.
- if (isIntermediate && latestFlush >= now) {
- return
- }
-
- try {
- val activeCommand = activeCommand ?: return
- val (timestamp, command) = activeCommand
-
- // Note: accessor is reliant on activeCommand being set
- val remainingWork = remainingWork
-
- isProcessing = true
-
- val duration = now - timestamp
- assert(duration >= 0) { "Flush in the past" }
-
- this.activeCommand = when (command) {
- is SimResourceCommand.Idle -> {
- // We should only continue processing the next command if:
- // 1. The resource consumer reached its deadline.
- // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if (command.deadline <= now || !isIntermediate) {
- next(now)
- } else {
- interpret(SimResourceCommand.Idle(command.deadline), now)
- }
- }
- is SimResourceCommand.Consume -> {
- // We should only continue processing the next command if:
- // 1. The resource consumption was finished.
- // 2. The resource capacity cannot satisfy the demand.
- // 4. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) {
- next(now)
- } else {
- interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now)
- }
- }
- SimResourceCommand.Exit ->
- // Flush may not be called when the resource consumer has finished
- throw IllegalStateException()
- }
-
- // Flush remaining work cache
- _remainingWorkFlush = Long.MIN_VALUE
- } catch (cause: Throwable) {
- doFail(cause)
- } finally {
- latestFlush = now
- isProcessing = false
- }
- }
-
- override fun interrupt() {
- // Prevent users from interrupting the resource while they are constructing their next command, as this will
- // only lead to infinite recursion.
- if (isProcessing) {
- return
- }
-
- scheduler.schedule(this, isIntermediate = false)
- }
-
- override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
-
- /**
- * A flag to indicate that the resource is currently processing a command.
- */
- private var isProcessing: Boolean = false
-
- /**
- * The current command that is being processed.
- */
- private var activeCommand: CommandWrapper? = null
-
- /**
- * The latest timestamp at which the resource was flushed.
- */
- private var latestFlush: Long = Long.MIN_VALUE
-
- /**
- * Finish the consumer and resource provider.
- */
- private fun doStop() {
- val state = state
- this.state = SimResourceState.Stopped
-
- if (state == SimResourceState.Active) {
- activeCommand = null
- try {
- consumer.onEvent(this, SimResourceEvent.Exit)
- onFinish()
- } catch (cause: Throwable) {
- doFail(cause)
- }
- }
- }
-
- /**
- * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
- */
- private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? {
- when (command) {
- is SimResourceCommand.Idle -> {
- val deadline = command.deadline
-
- require(deadline >= now) { "Deadline already passed" }
-
- speed = 0.0
-
- onIdle(deadline)
- consumer.onEvent(this, SimResourceEvent.Run)
- }
- is SimResourceCommand.Consume -> {
- val work = command.work
- val limit = command.limit
- val deadline = command.deadline
-
- require(deadline >= now) { "Deadline already passed" }
-
- speed = min(capacity, limit)
- onConsume(work, limit, deadline)
- consumer.onEvent(this, SimResourceEvent.Run)
- }
- is SimResourceCommand.Exit -> {
- speed = 0.0
-
- doStop()
-
- // No need to set the next active command
- return null
- }
- }
-
- return CommandWrapper(now, command)
- }
-
- /**
- * Request the workload for more work.
- */
- private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now)
-
- /**
- * Compute the remaining work based on the specified [wrapper] and [timestamp][now].
- */
- private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double {
- val (timestamp, command) = wrapper
- val duration = now - timestamp
- return when (command) {
- is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration)
- is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0
- }
- }
-
- /**
- * Fail the resource consumer.
- */
- private fun doFail(cause: Throwable) {
- state = SimResourceState.Stopped
- activeCommand = null
-
- try {
- consumer.onFailure(this, cause)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- e.printStackTrace()
- }
-
- onFinish()
- }
-
- /**
- * Indicate that the capacity of the resource has changed.
- */
- private fun onCapacityChange() {
- // Do not inform the consumer if it has not been started yet
- if (state != SimResourceState.Active) {
- return
- }
-
- val isThrottled = speed > capacity
-
- consumer.onEvent(this, SimResourceEvent.Capacity)
-
- // Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
- // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
- if (isThrottled) {
- flush(isIntermediate = true)
- }
- }
-
- /**
- * This class wraps a [command] with the timestamp it was started and possibly the task associated with it.
- */
- private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand)
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
new file mode 100644
index 00000000..de26f99e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
+/**
+ * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations.
+ */
+public abstract class SimAbstractResourceProvider(
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem?,
+ initialCapacity: Double
+) : SimResourceProvider {
+ /**
+ * The capacity of the resource.
+ */
+ public override var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ ctx?.capacity = value
+ }
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
+ /**
+ * The [SimResourceControllableContext] that is currently running.
+ */
+ protected var ctx: SimResourceControllableContext? = null
+ private set
+
+ /**
+ * The state of the resource provider.
+ */
+ final override var state: SimResourceState = SimResourceState.Pending
+ private set
+
+ /**
+ * Construct the [SimResourceProviderLogic] instance for a new consumer.
+ */
+ protected abstract fun createLogic(): SimResourceProviderLogic
+
+ /**
+ * Start the specified [SimResourceControllableContext].
+ */
+ protected open fun start(ctx: SimResourceControllableContext) {
+ ctx.start()
+ }
+
+ /**
+ * Update the counters of the resource provider.
+ */
+ protected fun updateCounters(ctx: SimResourceContext, work: Double) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += work
+ counters.actual += work - remainingWork
+ counters.overcommit += remainingWork
+ }
+
+ final override fun startConsumer(consumer: SimResourceConsumer) {
+ check(state == SimResourceState.Pending) { "Resource is in invalid state" }
+ val ctx = interpreter.newContext(consumer, createLogic(), parent)
+
+ ctx.capacity = capacity
+ this.ctx = ctx
+ this.state = SimResourceState.Active
+
+ start(ctx)
+ }
+
+ override fun close() {
+ cancel()
+ state = SimResourceState.Stopped
+ }
+
+ final override fun interrupt() {
+ ctx?.interrupt()
+ }
+
+ final override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.close()
+ }
+
+ if (state != SimResourceState.Stopped) {
+ state = SimResourceState.Pending
+ }
+ }
+
+ override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]"
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
index bb4e6a2c..00972f43 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt
@@ -25,24 +25,14 @@ package org.opendc.simulator.resources
/**
* A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource.
*/
-public interface SimResourceAggregator : AutoCloseable {
- /**
- * The output resource provider to which resource consumers can be attached.
- */
- public val output: SimResourceProvider
-
+public interface SimResourceAggregator : SimResourceProvider {
/**
* The input resources that will be switched between the output providers.
*/
public val inputs: Set<SimResourceProvider>
/**
- * Add the specified [input] to the switch.
+ * Add the specified [input] to the aggregator.
*/
public fun addInput(input: SimResourceProvider)
-
- /**
- * End the lifecycle of the aggregator.
- */
- public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 5665abd1..c39c1aca 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -25,7 +25,10 @@ package org.opendc.simulator.resources
/**
* A [SimResourceAggregator] that distributes the load equally across the input resources.
*/
-public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) {
+public class SimResourceAggregatorMaxMin(
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null
+) : SimAbstractResourceAggregator(interpreter, parent) {
private val consumers = mutableListOf<Input>()
override fun doConsume(work: Double, limit: Double, deadline: Long) {
@@ -35,7 +38,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA
// Divide the requests over the available capacity of the input resources fairly
for (input in consumers) {
val inputCapacity = input.ctx.capacity
- val fraction = inputCapacity / outputContext.capacity
+ val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
val grantedWork = fraction * work
@@ -53,7 +56,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA
}
}
- override fun doFinish(cause: Throwable?) {
+ override fun doFinish() {
val iterator = consumers.iterator()
for (input in iterator) {
iterator.remove()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index 7c76c634..0d9a6106 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -45,6 +45,11 @@ public interface SimResourceContext {
public val speed: Double
/**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
* The amount of work still remaining at this instant.
*/
public val remainingWork: Double
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
index cf0bbb28..ceaca39a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/SimpleScalingDriver.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
@@ -20,30 +20,45 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
-
-import org.opendc.simulator.compute.SimMachine
-import org.opendc.simulator.compute.SimProcessingUnit
-import org.opendc.simulator.compute.power.PowerModel
+package org.opendc.simulator.resources
/**
- * A [ScalingDriver] that ignores the instructions of the [ScalingGovernor] and directly computes the power consumption
- * based on the specified [power model][model].
+ * A controllable [SimResourceContext].
+ *
+ * This interface is used by resource providers to control the resource context.
*/
-public class SimpleScalingDriver(private val model: PowerModel) : ScalingDriver {
- override fun createLogic(machine: SimMachine): ScalingDriver.Logic = object : ScalingDriver.Logic {
- override fun createContext(cpu: SimProcessingUnit): ScalingContext {
- return object : ScalingContext {
- override val machine: SimMachine = machine
+public interface SimResourceControllableContext : SimResourceContext, AutoCloseable {
+ /**
+ * The state of the resource context.
+ */
+ public val state: SimResourceState
+
+ /**
+ * The capacity of the resource.
+ */
+ public override var capacity: Double
- override val cpu: SimProcessingUnit = cpu
+ /**
+ * Start the resource context.
+ */
+ public fun start()
- override fun setTarget(freq: Double) {}
- }
- }
+ /**
+ * Stop the resource context.
+ */
+ public override fun close()
- override fun computePower(): Double = model.computePower(machine.usage.value)
+ /**
+ * Invalidate the resource context's state.
+ *
+ * By invalidating the resource context's current state, the state is re-computed and the current progress is
+ * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the
+ * synchronous variant.
+ */
+ public fun invalidate()
- override fun toString(): String = "SimpleScalingDriver.Logic"
- }
+ /**
+ * Synchronously flush the progress of the resource context and materialize its current progress.
+ */
+ public fun flush()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
index f6a1a42e..725aa5bc 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt
@@ -23,15 +23,26 @@
package org.opendc.simulator.resources
/**
- * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer.
+ * An interface that tracks cumulative counts of the work performed by a resource.
*/
-public interface SimResourceFlushable {
+public interface SimResourceCounters {
/**
- * Flush the current active resource consumption.
- *
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
+ * The amount of work that resource consumers wanted the resource to perform.
*/
- public fun flush(isIntermediate: Boolean)
+ public val demand: Double
+
+ /**
+ * The amount of work performed by the resource.
+ */
+ public val actual: Double
+
+ /**
+ * The amount of work that could not be completed due to overcommitted resources.
+ */
+ public val overcommit: Double
+
+ /**
+ * Reset the resource counters.
+ */
+ public fun reset()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
index b2759b7f..e0333ff9 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt
@@ -25,19 +25,14 @@ package org.opendc.simulator.resources
/**
* A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers.
*/
-public interface SimResourceDistributor : AutoCloseable {
+public interface SimResourceDistributor : SimResourceConsumer {
/**
* The output resource providers to which resource consumers can be attached.
*/
public val outputs: Set<SimResourceProvider>
/**
- * The input resource that will be distributed over the consumers.
+ * Create a new output for the distributor.
*/
- public val input: SimResourceProvider
-
- /**
- * Add an output to the switch with the specified [capacity].
- */
- public fun addOutput(capacity: Double): SimResourceProvider
+ public fun newOutput(): SimResourceProvider
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index a76cb1e3..be9e89fb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -29,23 +29,22 @@ import kotlin.math.min
* A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing.
*/
public class SimResourceDistributorMaxMin(
- override val input: SimResourceProvider,
- private val scheduler: SimResourceScheduler,
- private val listener: Listener? = null
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
get() = _outputs
- private val _outputs = mutableSetOf<OutputProvider>()
+ private val _outputs = mutableSetOf<Output>()
/**
- * The active output contexts.
+ * The resource context of the consumer.
*/
- private val outputContexts: MutableList<OutputContext> = mutableListOf()
+ private var ctx: SimResourceContext? = null
/**
- * The total speed requested by the output resources.
+ * The active outputs.
*/
- private var totalRequestedSpeed = 0.0
+ private val activeOutputs: MutableList<Output> = mutableListOf()
/**
* The total amount of work requested by the output resources.
@@ -57,147 +56,83 @@ public class SimResourceDistributorMaxMin(
*/
private var totalAllocatedSpeed = 0.0
- /**
- * The total allocated work requested for the output resources.
- */
- private var totalAllocatedWork = 0.0
-
- /**
- * The amount of work that could not be performed due to over-committing resources.
- */
- private var totalOvercommittedWork = 0.0
-
- /**
- * The amount of work that was lost due to interference.
- */
- private var totalInterferedWork = 0.0
-
- /**
- * A flag to indicate that the switch is closed.
- */
- private var isClosed: Boolean = false
-
- /**
- * An internal [SimResourceConsumer] implementation for switch inputs.
- */
- private val consumer = object : SimResourceConsumer {
- /**
- * The resource context of the consumer.
- */
- private lateinit var ctx: SimResourceContext
-
- val remainingWork: Double
- get() = ctx.remainingWork
-
- override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- return doNext(ctx.capacity)
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- }
- SimResourceEvent.Exit -> {
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
-
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
-
- output.close()
- }
- }
- else -> {}
- }
- }
- }
-
- /**
- * The total amount of remaining work.
- */
- private val totalRemainingWork: Double
- get() = consumer.remainingWork
-
- override fun addOutput(capacity: Double): SimResourceProvider {
- check(!isClosed) { "Distributor has been closed" }
-
- val provider = OutputProvider(capacity)
+ /* SimResourceDistributor */
+ override fun newOutput(): SimResourceProvider {
+ val provider = Output(ctx?.capacity ?: 0.0)
_outputs.add(provider)
return provider
}
- override fun close() {
- if (!isClosed) {
- isClosed = true
- input.cancel()
- }
+ /* SimResourceConsumer */
+ override fun onNext(ctx: SimResourceContext): SimResourceCommand {
+ return doNext(ctx.capacity)
}
- init {
- input.startConsumer(consumer)
- }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ updateCapacity(ctx)
+ }
+ SimResourceEvent.Exit -> {
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
- /**
- * Indicate that the workloads should be re-scheduled.
- */
- private fun schedule() {
- input.interrupt()
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
+
+ output.close()
+ }
+ }
+ SimResourceEvent.Capacity -> updateCapacity(ctx)
+ else -> {}
+ }
}
/**
- * Schedule the work over the physical CPUs.
+ * Schedule the work of the outputs.
*/
- private fun doSchedule(capacity: Double): SimResourceCommand {
- // If there is no work yet, mark all inputs as idle.
- if (outputContexts.isEmpty()) {
+ private fun doNext(capacity: Double): SimResourceCommand {
+ // If there is no work yet, mark the input as idle.
+ if (activeOutputs.isEmpty()) {
return SimResourceCommand.Idle()
}
- val maxUsage = capacity
var duration: Double = Double.MAX_VALUE
var deadline: Long = Long.MAX_VALUE
- var availableSpeed = maxUsage
+ var availableSpeed = capacity
var totalRequestedSpeed = 0.0
var totalRequestedWork = 0.0
- // Flush the work of the outputs
- var outputIterator = outputContexts.listIterator()
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
-
- output.flush(isIntermediate = true)
+ // Pull in the work of the outputs
+ val outputIterator = activeOutputs.listIterator()
+ for (output in outputIterator) {
+ output.pull()
- if (output.activeCommand == SimResourceCommand.Exit) {
- // Apparently the output consumer has exited, so remove it from the scheduling queue.
+ // Remove outputs that have finished
+ if (output.isFinished) {
outputIterator.remove()
}
}
- // Sort the outputs based on their requested usage
- // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set
- outputContexts.sort()
+ // Sort in-place the outputs based on their requested usage.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeOutputs.sort()
// Divide the available input capacity fairly across the outputs using max-min fair sharing
- outputIterator = outputContexts.listIterator()
- var remaining = outputContexts.size
- while (outputIterator.hasNext()) {
- val output = outputIterator.next()
+ var remaining = activeOutputs.size
+ for (output in activeOutputs) {
val availableShare = availableSpeed / remaining--
when (val command = output.activeCommand) {
is SimResourceCommand.Idle -> {
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
-
output.actualSpeed = 0.0
}
is SimResourceCommand.Consume -> {
val grantedSpeed = min(output.allowedSpeed, availableShare)
-
- // Take into account the minimum deadline of this slice before we possible continue
deadline = min(deadline, command.deadline)
// Ignore idle computation
@@ -212,216 +147,139 @@ public class SimResourceDistributorMaxMin(
output.actualSpeed = grantedSpeed
availableSpeed -= grantedSpeed
- // The duration that we want to run is that of the shortest request from an output
+ // The duration that we want to run is that of the shortest request of an output
duration = min(duration, command.work / grantedSpeed)
}
SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" }
}
}
- assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" }
+ assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" }
- this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
- this.totalAllocatedSpeed = maxUsage - availableSpeed
- this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration)
+ this.totalAllocatedSpeed = capacity - availableSpeed
+ val totalAllocatedWork = min(
+ totalRequestedWork,
+ totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration)
+ )
return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline)
+ SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline)
else
SimResourceCommand.Idle(deadline)
}
- /**
- * Obtain the next command to perform.
- */
- private fun doNext(capacity: Double): SimResourceCommand {
- val totalRequestedWork = totalRequestedWork.toLong()
- val totalRemainingWork = totalRemainingWork.toLong()
- val totalAllocatedWork = totalAllocatedWork.toLong()
- val totalRequestedSpeed = totalRequestedSpeed
- val totalAllocatedSpeed = totalAllocatedSpeed
-
- // Force all inputs to re-schedule their work.
- val command = doSchedule(capacity)
-
- // Report metrics
- listener?.onSliceFinish(
- this,
- totalRequestedWork,
- totalAllocatedWork - totalRemainingWork,
- totalOvercommittedWork.toLong(),
- totalInterferedWork.toLong(),
- totalAllocatedSpeed,
- totalRequestedSpeed
- )
-
- totalInterferedWork = 0.0
- totalOvercommittedWork = 0.0
-
- return command
+ private fun updateCapacity(ctx: SimResourceContext) {
+ for (output in _outputs) {
+ output.capacity = ctx.capacity
+ }
}
/**
- * Event listener for hypervisor events.
+ * An internal [SimResourceProvider] implementation for switch outputs.
*/
- public interface Listener {
+ private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable<Output> {
/**
- * This method is invoked when a slice is finished.
+ * The current command that is processed by the resource.
*/
- public fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
+ var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
- /**
- * An internal [SimResourceProvider] implementation for switch outputs.
- */
- private inner class OutputProvider(val capacity: Double) : SimResourceProvider {
/**
- * The [OutputContext] that is currently running.
+ * The processing speed that is allowed by the model constraints.
*/
- private var ctx: OutputContext? = null
+ var allowedSpeed: Double = 0.0
- override var state: SimResourceState = SimResourceState.Pending
- internal set
+ /**
+ * The actual processing speed.
+ */
+ var actualSpeed: Double = 0.0
- override fun startConsumer(consumer: SimResourceConsumer) {
- check(state == SimResourceState.Pending) { "Resource cannot be consumed" }
+ /**
+ * A flag to indicate that the output is finished.
+ */
+ val isFinished
+ get() = activeCommand is SimResourceCommand.Exit
- val ctx = OutputContext(this, consumer)
- this.ctx = ctx
- this.state = SimResourceState.Active
- outputContexts += ctx
+ /**
+ * The timestamp at which we received the last command.
+ */
+ private var lastCommandTimestamp: Long = Long.MIN_VALUE
- ctx.start()
- schedule()
- }
+ /* SimAbstractResourceProvider */
+ override fun createLogic(): SimResourceProviderLogic = this
- override fun close() {
- cancel()
+ override fun start(ctx: SimResourceControllableContext) {
+ activeOutputs += this
- if (state != SimResourceState.Stopped) {
- state = SimResourceState.Stopped
- _outputs.remove(this)
+ interpreter.batch {
+ ctx.start()
+ // Interrupt the input to re-schedule the resources
+ this@SimResourceDistributorMaxMin.ctx?.interrupt()
}
}
- override fun interrupt() {
- ctx?.interrupt()
- }
+ override fun close() {
+ val state = state
- override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.stop()
- }
+ super.close()
if (state != SimResourceState.Stopped) {
- state = SimResourceState.Pending
+ _outputs.remove(this)
}
}
- }
-
- /**
- * A [SimAbstractResourceContext] for the output resources.
- */
- private inner class OutputContext(
- private val provider: OutputProvider,
- consumer: SimResourceConsumer
- ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> {
- /**
- * The current command that is processed by the vCPU.
- */
- var activeCommand: SimResourceCommand = SimResourceCommand.Idle()
-
- /**
- * The processing speed that is allowed by the model constraints.
- */
- var allowedSpeed: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- var actualSpeed: Double = 0.0
-
- private fun reportOvercommit() {
- val remainingWork = remainingWork
- totalOvercommittedWork += remainingWork
- }
-
- override fun onIdle(deadline: Long) {
- reportOvercommit()
+ /* SimResourceProviderLogic */
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
allowedSpeed = 0.0
activeCommand = SimResourceCommand.Idle(deadline)
- }
+ lastCommandTimestamp = ctx.clock.millis()
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- reportOvercommit()
+ return Long.MAX_VALUE
+ }
- allowedSpeed = speed
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ allowedSpeed = ctx.speed
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
+ lastCommandTimestamp = ctx.clock.millis()
+
+ return Long.MAX_VALUE
}
- override fun onFinish() {
- reportOvercommit()
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
+ override fun onFinish(ctx: SimResourceControllableContext) {
activeCommand = SimResourceCommand.Exit
- provider.cancel()
+ lastCommandTimestamp = ctx.clock.millis()
}
- override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
- // Apply performance interference model
- val performanceScore = 1.0
+ override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0
- // Compute the remaining amount of work
return if (work > 0.0) {
- // Compute the fraction of compute time allocated to the VM
+ // Compute the fraction of compute time allocated to the output
val fraction = actualSpeed / totalAllocatedSpeed
- // Compute the work that was actually granted to the VM.
- val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction
- val processed = processingAvailable * performanceScore
-
- val interferedWork = processingAvailable - processed
-
- totalInterferedWork += interferedWork
-
- max(0.0, work - processed)
+ // Compute the work that was actually granted to the output.
+ val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction
+ max(0.0, work - processingAvailable)
} else {
0.0
}
}
- private var isProcessing: Boolean = false
-
- override fun interrupt() {
- // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
- // to infinite recursion.
- if (isProcessing) {
- return
- }
-
- try {
- isProcessing = false
+ /* Comparable */
+ override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed)
- super.interrupt()
-
- // Force the scheduler to re-schedule
- schedule()
- } finally {
- isProcessing = true
+ /**
+ * Pull the next command if necessary.
+ */
+ fun pull() {
+ val ctx = ctx
+ if (ctx != null && lastCommandTimestamp < ctx.clock.millis()) {
+ ctx.flush()
}
}
-
- override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
new file mode 100644
index 00000000..82631377
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * The resource interpreter is responsible for managing the interaction between resource consumer and provider.
+ *
+ * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation
+ * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
+ */
+public interface SimResourceInterpreter {
+ /**
+ * The [Clock] associated with this interpreter.
+ */
+ public val clock: Clock
+
+ /**
+ * Create a new [SimResourceControllableContext] with the given [provider].
+ *
+ * @param consumer The consumer logic.
+ * @param provider The logic of the resource provider.
+ * @param parent The system to which the resource context belongs.
+ */
+ public fun newContext(
+ consumer: SimResourceConsumer,
+ provider: SimResourceProviderLogic,
+ parent: SimResourceSystem? = null
+ ): SimResourceControllableContext
+
+ /**
+ * Start batching the execution of resource updates until [popBatch] is called.
+ *
+ * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs
+ * simultaneously) in a single state update.
+ *
+ * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the
+ * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called
+ * the same amount of times. To simplify batching, see [batch].
+ */
+ public fun pushBatch()
+
+ /**
+ * Stop the batching of resource updates and run the interpreter on the batch.
+ *
+ * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call.
+ */
+ public fun popBatch()
+
+ public companion object {
+ /**
+ * Construct a new [SimResourceInterpreter] implementation.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+ @JvmName("create")
+ public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter {
+ return SimResourceInterpreterImpl(context, clock)
+ }
+ }
+}
+
+/**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+public inline fun SimResourceInterpreter.batch(block: () -> Unit) {
+ try {
+ pushBatch()
+ block()
+ } finally {
+ popBatch()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 2f567a5e..f709ca17 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -27,7 +27,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
/**
- * A [SimResourceProvider] provides some resource of type [R].
+ * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer].
*/
public interface SimResourceProvider : AutoCloseable {
/**
@@ -36,6 +36,26 @@ public interface SimResourceProvider : AutoCloseable {
public val state: SimResourceState
/**
+ * The resource capacity available at this instant.
+ */
+ public val capacity: Double
+
+ /**
+ * The current processing speed of the resource.
+ */
+ public val speed: Double
+
+ /**
+ * The resource processing speed demand at this instant.
+ */
+ public val demand: Double
+
+ /**
+ * The resource counters to track the execution metrics of the resource.
+ */
+ public val counters: SimResourceCounters
+
+ /**
* Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously.
*
* @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
new file mode 100644
index 00000000..5231ecf5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import kotlin.math.max
+
+/**
+ * The logic of a resource provider.
+ */
+public interface SimResourceProviderLogic {
+ /**
+ * This method is invoked when the resource is reported to idle until the specified [deadline].
+ *
+ * @param ctx The context in which the provider runs.
+ * @param deadline The deadline that was requested by the resource consumer.
+ * @return The instant at which to resume the consumer.
+ */
+ public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long
+
+ /**
+ * This method is invoked when the resource will be consumed until the specified amount of [work] was processed
+ * or [deadline] is reached.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param work The amount of work that was requested by the resource consumer.
+ * @param limit The limit on the work rate of the resource consumer.
+ * @param deadline The deadline that was requested by the resource consumer.
+ * @return The instant at which to resume the consumer.
+ */
+ public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long
+
+ /**
+ * This method is invoked when the progress of the resource consumer is materialized.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param work The amount of work that was requested by the resource consumer.
+ */
+ public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {}
+
+ /**
+ * This method is invoked when the resource consumer has finished.
+ */
+ public fun onFinish(ctx: SimResourceControllableContext)
+
+ /**
+ * Get the remaining work to process after a resource consumption.
+ *
+ * @param work The size of the resource consumption.
+ * @param speed The speed of consumption.
+ * @param duration The duration from the start of the consumption until now.
+ * @return The amount of work remaining.
+ */
+ public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double {
+ return if (duration > 0L) {
+ val processed = duration / 1000.0 * speed
+ max(0.0, work - processed)
+ } else {
+ 0.0
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
deleted file mode 100644
index a228c47b..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import java.time.Clock
-
-/**
- * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource
- * providers and consumers.
- *
- * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more
- * efficiently, reducing the overall work needed per update.
- */
-public interface SimResourceScheduler {
- /**
- * The [Clock] associated with this scheduler.
- */
- public val clock: Clock
-
- /**
- * Schedule a direct interrupt for the resource context represented by [flushable].
- *
- * @param flushable The resource context that needs to be flushed.
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false)
-
- /**
- * Schedule an interrupt in the future for the resource context represented by [flushable].
- *
- * This method will override earlier calls to this method for the same [flushable].
- *
- * @param flushable The resource context that needs to be flushed.
- * @param timestamp The timestamp when the interrupt should happen.
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false)
-
- /**
- * Batch the execution of several interrupts into a single call.
- *
- * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
- */
- public fun batch(block: () -> Unit)
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
deleted file mode 100644
index cdbb4a6c..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
-import java.util.ArrayDeque
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after.
- *
- * @param clock The virtual simulation clock.
- */
-public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler {
- /**
- * The [TimerScheduler] to actually schedule the interrupts.
- */
- private val timers = TimerScheduler<Any>(context, clock)
-
- /**
- * A flag to indicate that an interrupt is currently running already.
- */
- private var isRunning: Boolean = false
-
- /**
- * The queue of resources to be flushed.
- */
- private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>()
-
- override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) {
- queue.add(flushable to isIntermediate)
-
- if (isRunning) {
- return
- }
-
- flush()
- }
-
- override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) {
- timers.startSingleTimerTo(flushable, timestamp) {
- schedule(flushable, isIntermediate)
- }
- }
-
- override fun batch(block: () -> Unit) {
- val wasAlreadyRunning = isRunning
- try {
- isRunning = true
- block()
- } finally {
- if (!wasAlreadyRunning) {
- isRunning = false
- }
- }
- }
-
- /**
- * Flush the scheduled queue.
- */
- private fun flush() {
- val visited = mutableSetOf<SimResourceFlushable>()
- try {
- isRunning = true
- while (queue.isNotEmpty()) {
- val (flushable, isIntermediate) = queue.poll()
- flushable.flush(isIntermediate)
- visited.add(flushable)
- }
- } finally {
- isRunning = false
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 3277b889..9f062cc3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -26,98 +26,39 @@ import kotlin.math.ceil
import kotlin.math.min
/**
- * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
+ * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param scheduler The scheduler to schedule the interrupts.
+ * @param interpreter The interpreter that is used for managing the resource contexts.
+ * @param parent The parent resource system.
*/
public class SimResourceSource(
initialCapacity: Double,
- private val scheduler: SimResourceScheduler
-) : SimResourceProvider {
- /**
- * The current processing speed of the resource.
- */
- public val speed: Double
- get() = ctx?.speed ?: 0.0
-
- /**
- * The capacity of the resource.
- */
- public var capacity: Double = initialCapacity
- set(value) {
- field = value
- ctx?.capacity = value
- }
-
- /**
- * The [Context] that is currently running.
- */
- private var ctx: Context? = null
-
- override var state: SimResourceState = SimResourceState.Pending
- private set
-
- override fun startConsumer(consumer: SimResourceConsumer) {
- check(state == SimResourceState.Pending) { "Resource is in invalid state" }
- val ctx = Context(consumer)
-
- this.ctx = ctx
- this.state = SimResourceState.Active
-
- ctx.start()
- }
-
- override fun close() {
- cancel()
- state = SimResourceState.Stopped
- }
-
- override fun interrupt() {
- ctx?.interrupt()
- }
-
- override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.stop()
- }
-
- if (state != SimResourceState.Stopped) {
- state = SimResourceState.Pending
- }
- }
-
- /**
- * Internal implementation of [SimResourceContext] for this class.
- */
- private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) {
- override fun onIdle(deadline: Long) {
- // Do not resume if deadline is "infinite"
- if (deadline != Long.MAX_VALUE) {
- scheduler.schedule(this, deadline)
+ private val interpreter: SimResourceInterpreter,
+ private val parent: SimResourceSystem? = null
+) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) {
+ override fun createLogic(): SimResourceProviderLogic {
+ return object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long {
+ return deadline
}
- }
-
- override fun onConsume(work: Double, limit: Double, deadline: Long) {
- val until = min(deadline, clock.millis() + getDuration(work, speed))
- scheduler.schedule(this, until)
- }
- override fun onFinish() {
- cancel()
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long {
+ return min(deadline, ctx.clock.millis() + getDuration(work, speed))
+ }
- ctx = null
+ override fun onUpdate(ctx: SimResourceControllableContext, work: Double) {
+ updateCounters(ctx, work)
+ }
- if (this@SimResourceSource.state != SimResourceState.Stopped) {
- this@SimResourceSource.state = SimResourceState.Pending
+ override fun onFinish(ctx: SimResourceControllableContext) {
+ cancel()
}
}
-
- override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
}
+ override fun toString(): String = "SimResourceSource[capacity=$capacity]"
+
/**
* Compute the duration that a resource consumption will take with the specified [speed].
*/
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
index 53fec16a..e224285e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt
@@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable {
public val inputs: Set<SimResourceProvider>
/**
- * Add an output to the switch with the specified [capacity].
+ * The resource counters to track the execution metrics of all switch resources.
*/
- public fun addOutput(capacity: Double): SimResourceProvider
+ public val counters: SimResourceCounters
+
+ /**
+ * Create a new output on the switch.
+ */
+ public fun newOutput(): SimResourceProvider
/**
* Add the specified [input] to the switch.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 1a9dd0bc..2950af80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
override val inputs: Set<SimResourceProvider>
get() = _inputs
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override val counters: SimResourceCounters = object : SimResourceCounters {
+ override val demand: Double
+ get() = _inputs.sumOf { it.counters.demand }
+ override val actual: Double
+ get() = _inputs.sumOf { it.counters.actual }
+ override val overcommit: Double
+ get() = _inputs.sumOf { it.counters.overcommit }
+
+ override fun reset() {
+ for (input in _inputs) {
+ input.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
check(availableResources.isNotEmpty()) { "No capacity to serve request" }
val forwarder = availableResources.poll()
- val output = Provider(capacity, forwarder)
+ val output = Provider(forwarder)
_outputs += output
return output
}
@@ -84,13 +101,9 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
_inputs.forEach(SimResourceProvider::cancel)
}
- private inner class Provider(
- private val capacity: Double,
- private val forwarder: SimResourceTransformer
- ) : SimResourceProvider by forwarder {
+ private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder {
override fun close() {
// We explicitly do not close the forwarder here in order to re-use it across output resources.
-
_outputs -= this
availableResources += forwarder
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index 5dc1e68d..684a1b52 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -22,23 +22,31 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.*
-
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
public class SimResourceSwitchMaxMin(
- scheduler: SimResourceScheduler,
- private val listener: Listener? = null
+ interpreter: SimResourceInterpreter,
+ parent: SimResourceSystem? = null
) : SimResourceSwitch {
- private val _outputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The output resource providers to which resource consumers can be attached.
+ */
override val outputs: Set<SimResourceProvider>
- get() = _outputs
+ get() = distributor.outputs
- private val _inputs = mutableSetOf<SimResourceProvider>()
+ /**
+ * The input resources that will be switched between the output providers.
+ */
override val inputs: Set<SimResourceProvider>
- get() = _inputs
+ get() = aggregator.inputs
+
+ /**
+ * The resource counters to track the execution metrics of all switch resources.
+ */
+ override val counters: SimResourceCounters
+ get() = aggregator.counters
/**
* A flag to indicate that the switch was closed.
@@ -48,37 +56,24 @@ public class SimResourceSwitchMaxMin(
/**
* The aggregator to aggregate the resources.
*/
- private val aggregator = SimResourceAggregatorMaxMin(scheduler)
+ private val aggregator = SimResourceAggregatorMaxMin(interpreter, parent)
/**
* The distributor to distribute the aggregated resources.
*/
- private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, scheduler,
- object : SimResourceDistributorMaxMin.Listener {
- override fun onSliceFinish(
- switch: SimResourceDistributor,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand)
- }
- }
- )
+ private val distributor = SimResourceDistributorMaxMin(interpreter, parent)
+
+ init {
+ aggregator.startConsumer(distributor)
+ }
/**
- * Add an output to the switch represented by [resource].
+ * Add an output to the switch.
*/
- override fun addOutput(capacity: Double): SimResourceProvider {
+ override fun newOutput(): SimResourceProvider {
check(!isClosed) { "Switch has been closed" }
- val provider = distributor.addOutput(capacity)
- _outputs.add(provider)
- return provider
+ return distributor.newOutput()
}
/**
@@ -93,26 +88,7 @@ public class SimResourceSwitchMaxMin(
override fun close() {
if (!isClosed) {
isClosed = true
- distributor.close()
aggregator.close()
}
}
-
- /**
- * Event listener for hypervisor events.
- */
- public interface Listener {
- /**
- * This method is invoked when a slice is finished.
- */
- public fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- )
- }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt
new file mode 100644
index 00000000..609262cb
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A system of possible multiple sub-resources.
+ *
+ * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the
+ * resource provider.
+ */
+public interface SimResourceSystem {
+ /**
+ * The parent system to which this system belongs or `null` if it has no parent.
+ */
+ public val parent: SimResourceSystem?
+
+ /**
+ * This method is invoked when the system has converged to a steady-state.
+ *
+ * @param timestamp The timestamp at which the system converged.
+ */
+ public fun onConverge(timestamp: Long)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 32f3f573..fd3d1230 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.resources
+import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+
/**
* A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
*
@@ -53,6 +55,19 @@ public class SimResourceTransformer(
override var state: SimResourceState = SimResourceState.Pending
private set
+ override val capacity: Double
+ get() = ctx?.capacity ?: 0.0
+
+ override val speed: Double
+ get() = ctx?.speed ?: 0.0
+
+ override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ override val counters: SimResourceCounters
+ get() = _counters
+ private val _counters = SimResourceCountersImpl()
+
override fun startConsumer(consumer: SimResourceConsumer) {
check(state == SimResourceState.Pending) { "Resource is in invalid state" }
@@ -97,10 +112,15 @@ public class SimResourceTransformer(
start()
}
+ updateCounters(ctx)
+
return if (state == SimResourceState.Stopped) {
SimResourceCommand.Exit
} else if (delegate != null) {
val command = transform(ctx, delegate.onNext(ctx))
+
+ _work = if (command is SimResourceCommand.Consume) command.work else 0.0
+
if (command == SimResourceCommand.Exit) {
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
// reset beforehand the existing state and check whether it has been updated afterwards
@@ -169,6 +189,22 @@ public class SimResourceTransformer(
state = SimResourceState.Pending
}
}
+
+ /**
+ * Counter to track the current submitted work.
+ */
+ private var _work = 0.0
+
+ /**
+ * Update the resource counters for the transformer.
+ */
+ private fun updateCounters(ctx: SimResourceContext) {
+ val counters = _counters
+ val remainingWork = ctx.remainingWork
+ counters.demand += _work
+ counters.actual += _work - remainingWork
+ counters.overcommit += remainingWork
+ }
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
new file mode 100644
index 00000000..46c5c63f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -0,0 +1,422 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.impl
+
+import org.opendc.simulator.resources.*
+import java.time.Clock
+import kotlin.math.min
+
+/**
+ * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
+ */
+internal class SimResourceContextImpl(
+ override val parent: SimResourceSystem?,
+ private val interpreter: SimResourceInterpreterImpl,
+ private val consumer: SimResourceConsumer,
+ private val logic: SimResourceProviderLogic
+) : SimResourceControllableContext, SimResourceSystem {
+ /**
+ * The clock of the context.
+ */
+ override val clock: Clock
+ get() = interpreter.clock
+
+ /**
+ * The capacity of the resource.
+ */
+ override var capacity: Double = 0.0
+ set(value) {
+ val oldValue = field
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ field = value
+ onCapacityChange()
+ }
+ }
+
+ /**
+ * The amount of work still remaining at this instant.
+ */
+ override val remainingWork: Double
+ get() {
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ computeRemainingWork(now).also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
+
+ /**
+ * A flag to indicate the state of the context.
+ */
+ override val state: SimResourceState
+ get() = _state
+ private var _state = SimResourceState.Pending
+
+ /**
+ * The current processing speed of the resource.
+ */
+ override val speed: Double
+ get() = _speed
+ private var _speed = 0.0
+
+ /**
+ * The current resource processing demand.
+ */
+ override val demand: Double
+ get() = _limit
+
+ private val counters = object : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
+
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
+ }
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ /**
+ * The current state of the resource context.
+ */
+ private var _timestamp: Long = Long.MIN_VALUE
+ private var _work: Double = 0.0
+ private var _limit: Double = 0.0
+ private var _deadline: Long = Long.MAX_VALUE
+
+ /**
+ * The update flag indicating why the update was triggered.
+ */
+ private var _flag: Flag = Flag.None
+
+ /**
+ * The current pending update.
+ */
+ private var _pendingUpdate: SimResourceInterpreterImpl.Update? = null
+
+ override fun start() {
+ check(_state == SimResourceState.Pending) { "Consumer is already started" }
+ interpreter.batch {
+ consumer.onEvent(this, SimResourceEvent.Start)
+ _state = SimResourceState.Active
+ interrupt()
+ }
+ }
+
+ override fun close() {
+ if (_state != SimResourceState.Stopped) {
+ interpreter.batch {
+ _state = SimResourceState.Stopped
+ doStop()
+ }
+ }
+ }
+
+ override fun interrupt() {
+ if (_state == SimResourceState.Stopped) {
+ return
+ }
+
+ enableFlag(Flag.Interrupt)
+ scheduleUpdate()
+ }
+
+ override fun invalidate() {
+ if (_state == SimResourceState.Stopped) {
+ return
+ }
+
+ enableFlag(Flag.Invalidate)
+ scheduleUpdate()
+ }
+
+ override fun flush() {
+ if (_state == SimResourceState.Stopped) {
+ return
+ }
+
+ interpreter.scheduleSync(this)
+ }
+
+ /**
+ * Determine whether the state of the resource context should be updated.
+ */
+ fun requiresUpdate(timestamp: Long): Boolean {
+ // Either the resource context is flagged or there is a pending update at this timestamp
+ return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp
+ }
+
+ /**
+ * Update the state of the resource context.
+ */
+ fun doUpdate(timestamp: Long) {
+ try {
+ val oldState = _state
+ val newState = doUpdate(timestamp, oldState)
+
+ _state = newState
+ _flag = Flag.None
+
+ when (newState) {
+ SimResourceState.Pending ->
+ if (oldState != SimResourceState.Pending) {
+ throw IllegalStateException("Illegal transition to pending state")
+ }
+ SimResourceState.Stopped ->
+ if (oldState != SimResourceState.Stopped) {
+ doStop()
+ }
+ else -> {}
+ }
+ } catch (cause: Throwable) {
+ doFail(cause)
+ } finally {
+ _remainingWorkFlush = Long.MIN_VALUE
+ _timestamp = timestamp
+ }
+ }
+
+ override fun onConverge(timestamp: Long) {
+ if (_state == SimResourceState.Active) {
+ consumer.onEvent(this, SimResourceEvent.Run)
+ }
+ }
+
+ override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]"
+
+ /**
+ * Update the state of the resource context.
+ */
+ private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState {
+ return when (state) {
+ // Resource context is not active, so its state will not update
+ SimResourceState.Pending, SimResourceState.Stopped -> state
+ SimResourceState.Active -> {
+ val isInterrupted = _flag == Flag.Interrupt
+ val remainingWork = remainingWork
+ val isConsume = _limit > 0.0
+
+ // Update the resource counters only if there is some progress
+ if (timestamp > _timestamp) {
+ logic.onUpdate(this, _work)
+ }
+
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource capacity cannot satisfy the demand.
+ // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) {
+ next(timestamp)
+ } else if (isConsume) {
+ interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp)
+ } else {
+ interpret(SimResourceCommand.Idle(_deadline), timestamp)
+ }
+ }
+ }
+ }
+
+ /**
+ * Stop the resource context.
+ */
+ private fun doStop() {
+ try {
+ consumer.onEvent(this, SimResourceEvent.Exit)
+ logic.onFinish(this)
+ } catch (cause: Throwable) {
+ doFail(cause)
+ }
+ }
+
+ /**
+ * Fail the resource consumer.
+ */
+ private fun doFail(cause: Throwable) {
+ try {
+ consumer.onFailure(this, cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ e.printStackTrace()
+ }
+
+ logic.onFinish(this)
+ }
+
+ /**
+ * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer.
+ */
+ private fun interpret(command: SimResourceCommand, now: Long): SimResourceState {
+ return when (command) {
+ is SimResourceCommand.Idle -> {
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ _speed = 0.0
+ _work = 0.0
+ _limit = 0.0
+ _deadline = deadline
+
+ val timestamp = logic.onIdle(this, deadline)
+ scheduleUpdate(timestamp)
+
+ SimResourceState.Active
+ }
+ is SimResourceCommand.Consume -> {
+ val work = command.work
+ val limit = command.limit
+ val deadline = command.deadline
+
+ require(deadline >= now) { "Deadline already passed" }
+
+ _speed = min(capacity, limit)
+ _work = work
+ _limit = limit
+ _deadline = deadline
+
+ val timestamp = logic.onConsume(this, work, limit, deadline)
+ scheduleUpdate(timestamp)
+
+ SimResourceState.Active
+ }
+ is SimResourceCommand.Exit -> {
+ _speed = 0.0
+ _work = 0.0
+ _limit = 0.0
+ _deadline = Long.MAX_VALUE
+
+ SimResourceState.Stopped
+ }
+ }
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now)
+
+ /**
+ * Compute the remaining work based on the current state.
+ */
+ private fun computeRemainingWork(now: Long): Double {
+ return if (_work > 0.0)
+ logic.getRemainingWork(this, _work, speed, now - _timestamp)
+ else 0.0
+ }
+
+ /**
+ * Indicate that the capacity of the resource has changed.
+ */
+ private fun onCapacityChange() {
+ // Do not inform the consumer if it has not been started yet
+ if (state != SimResourceState.Active) {
+ return
+ }
+
+ val isThrottled = speed > capacity
+
+ interpreter.batch {
+ // Inform the consumer of the capacity change. This might already trigger an interrupt.
+ consumer.onEvent(this, SimResourceEvent.Capacity)
+
+ // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command.
+ if (isThrottled) {
+ invalidate()
+ }
+ }
+ }
+
+ /**
+ * Enable the specified [flag] taking into account precedence.
+ */
+ private fun enableFlag(flag: Flag) {
+ _flag = when (_flag) {
+ Flag.None -> flag
+ Flag.Invalidate ->
+ when (flag) {
+ Flag.None -> flag
+ else -> flag
+ }
+ Flag.Interrupt ->
+ when (flag) {
+ Flag.None, Flag.Invalidate -> flag
+ else -> flag
+ }
+ }
+ }
+
+ /**
+ * Schedule an update for this resource context.
+ */
+ private fun scheduleUpdate() {
+ // Cancel the pending update
+ val pendingUpdate = _pendingUpdate
+ if (pendingUpdate != null) {
+ _pendingUpdate = null
+ pendingUpdate.cancel()
+ }
+
+ interpreter.scheduleImmediate(this)
+ }
+
+ /**
+ * Schedule a delayed update for this resource context.
+ */
+ private fun scheduleUpdate(timestamp: Long) {
+ val pendingUpdate = _pendingUpdate
+ if (pendingUpdate != null) {
+ if (pendingUpdate.timestamp == timestamp) {
+ // Fast-path: A pending update for the same timestamp already exists
+ return
+ } else {
+ // Cancel the old pending update
+ _pendingUpdate = null
+ pendingUpdate.cancel()
+ }
+ }
+
+ if (timestamp != Long.MAX_VALUE) {
+ _pendingUpdate = interpreter.scheduleDelayed(this, timestamp)
+ }
+ }
+
+ /**
+ * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or
+ * interrupted.
+ */
+ enum class Flag {
+ None,
+ Interrupt,
+ Invalidate
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
index ddbe1ca0..827019c5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt
@@ -20,17 +20,23 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.cpufreq
+package org.opendc.simulator.resources.impl
+
+import org.opendc.simulator.resources.SimResourceCounters
/**
- * A CPUFreq [ScalingGovernor] that requests the frequency based on the utilization of the machine.
+ * Mutable implementation of the [SimResourceCounters] interface.
*/
-public class DemandScalingGovernor : ScalingGovernor {
- override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
- override fun onLimit() {
- ctx.setTarget(ctx.cpu.speed)
- }
+internal class SimResourceCountersImpl : SimResourceCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
- override fun toString(): String = "DemandScalingGovernor.Logic"
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
}
+
+ override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
new file mode 100644
index 00000000..cb0d6160
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
@@ -0,0 +1,331 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources.impl
+
+import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Runnable
+import org.opendc.simulator.resources.*
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter {
+ /**
+ * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+
+ /**
+ * The queue of resource updates that are scheduled for immediate execution.
+ */
+ private val queue = ArrayDeque<Update>()
+
+ /**
+ * A priority queue containing the resource updates to be scheduled in the future.
+ */
+ private val futureQueue = PriorityQueue<Update>()
+
+ /**
+ * The stack of interpreter invocations to occur in the future.
+ */
+ private val futureInvocations = ArrayDeque<Invocation>()
+
+ /**
+ * The systems that have been visited during the interpreter cycle.
+ */
+ private val visited = linkedSetOf<SimResourceSystem>()
+
+ /**
+ * The index in the batch stack.
+ */
+ private var batchIndex = 0
+
+ /**
+ * A flag to indicate that the interpreter is currently active.
+ */
+ private val isRunning: Boolean
+ get() = batchIndex > 0
+
+ /**
+ * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle.
+ *
+ * This method should be used when the state of a resource context is invalidated/interrupted and needs to be
+ * re-computed. In case no interpreter is currently active, the interpreter will be started.
+ */
+ fun scheduleImmediate(ctx: SimResourceContextImpl) {
+ queue.add(Update(ctx, Long.MIN_VALUE))
+
+ // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active interpreter.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Update the specified [ctx] synchronously.
+ */
+ fun scheduleSync(ctx: SimResourceContextImpl) {
+ ctx.doUpdate(clock.millis())
+
+ if (visited.add(ctx)) {
+ collectAncestors(ctx, visited)
+ }
+
+ // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active interpreter.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Schedule the interpreter to run at [timestamp] to update the resource contexts.
+ *
+ * This method will override earlier calls to this method for the same [ctx].
+ *
+ * @param ctx The resource context to which the event applies.
+ * @param timestamp The timestamp when the interrupt should happen.
+ */
+ fun scheduleDelayed(ctx: SimResourceContextImpl, timestamp: Long): Update {
+ val now = clock.millis()
+ val futureQueue = futureQueue
+
+ require(timestamp >= now) { "Timestamp must be in the future" }
+
+ val update = Update(ctx, timestamp)
+ futureQueue.add(update)
+
+ // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference.
+ if (futureQueue.peek() === update) {
+ trySchedule(futureQueue, futureInvocations)
+ }
+
+ return update
+ }
+
+ override fun newContext(
+ consumer: SimResourceConsumer,
+ provider: SimResourceProviderLogic,
+ parent: SimResourceSystem?
+ ): SimResourceControllableContext = SimResourceContextImpl(parent, this, consumer, provider)
+
+ override fun pushBatch() {
+ batchIndex++
+ }
+
+ override fun popBatch() {
+ try {
+ // Flush the work if the platform is not already running
+ if (batchIndex == 1 && queue.isNotEmpty()) {
+ runInterpreter()
+ }
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Interpret all actions that are scheduled for the current timestamp.
+ */
+ private fun runInterpreter() {
+ val now = clock.millis()
+ val queue = queue
+ val futureQueue = futureQueue
+ val futureInvocations = futureInvocations
+ val visited = visited
+
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ val update = futureQueue.peek() ?: break
+
+ assert(update.timestamp >= now) { "Internal inconsistency: found update of the past" }
+
+ if (update.timestamp > now && !update.isCancelled) {
+ // Schedule a task for the next event to occur.
+ trySchedule(futureQueue, futureInvocations)
+ break
+ }
+
+ futureQueue.poll()
+
+ if (update(now) && visited.add(update.ctx)) {
+ collectAncestors(update.ctx, visited)
+ }
+ }
+
+ // Repeat execution of all immediate updates until the system has converged to a steady-state
+ // We have to take into account that the onConverge callback can also trigger new actions.
+ do {
+ // Execute all immediate updates
+ while (true) {
+ val update = queue.poll() ?: break
+ if (update(now) && visited.add(update.ctx)) {
+ collectAncestors(update.ctx, visited)
+ }
+ }
+
+ for (system in visited) {
+ system.onConverge(now)
+ }
+
+ visited.clear()
+ } while (queue.isNotEmpty())
+ }
+
+ /**
+ * Try to schedule the next interpreter event.
+ */
+ private fun trySchedule(queue: PriorityQueue<Update>, scheduled: ArrayDeque<Invocation>) {
+ val nextTimer = queue.peek()
+ val now = clock.millis()
+
+ // Check whether we need to update our schedule:
+ if (nextTimer == null) {
+ // Case 1: all timers are cancelled
+ for (invocation in scheduled) {
+ invocation.cancel()
+ }
+ scheduled.clear()
+ return
+ }
+
+ while (true) {
+ val invocation = scheduled.peekFirst()
+ if (invocation == null || invocation.timestamp > nextTimer.timestamp) {
+ // Case 2: A new timer was registered ahead of the other timers.
+ // Solution: Schedule a new scheduler invocation
+ val nextTimestamp = nextTimer.timestamp
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(
+ nextTimestamp - now,
+ {
+ try {
+ batchIndex++
+ runInterpreter()
+ } finally {
+ batchIndex--
+ }
+ },
+ context
+ )
+ scheduled.addFirst(Invocation(nextTimestamp, handle))
+ break
+ } else if (invocation.timestamp < nextTimer.timestamp) {
+ // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted
+ // Solution: Cancel the next scheduler invocation
+ invocation.cancel()
+ scheduled.pollFirst()
+ } else {
+ break
+ }
+ }
+ }
+
+ /**
+ * Collect all the ancestors of the specified [system].
+ */
+ private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) {
+ val parent = system.parent
+ if (parent != null) {
+ systems.add(parent)
+ collectAncestors(parent, systems)
+ }
+ }
+
+ /**
+ * A future interpreter invocation.
+ *
+ * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case
+ * the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private data class Invocation(
+ @JvmField val timestamp: Long,
+ @JvmField private val disposableHandle: DisposableHandle
+ ) {
+ /**
+ * Cancel the interpreter invocation.
+ */
+ fun cancel() = disposableHandle.dispose()
+ }
+
+ /**
+ * An update call for [ctx] that is scheduled for [timestamp].
+ *
+ * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be
+ * cancelled if the resource context was invalidated in the meantime.
+ */
+ class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable<Update> {
+ /**
+ * A flag to indicate that the task has been cancelled.
+ */
+ @JvmField
+ var isCancelled: Boolean = false
+
+ /**
+ * Cancel the update.
+ */
+ fun cancel() {
+ isCancelled = true
+ }
+
+ /**
+ * Immediately run update.
+ */
+ operator fun invoke(timestamp: Long): Boolean {
+ val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp)
+ if (shouldExecute) {
+ ctx.doUpdate(timestamp)
+ }
+ return shouldExecute
+ }
+
+ override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp)
+
+ override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index 2b32300e..51024e80 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceAggregatorMaxMin] class.
@@ -41,7 +42,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimResourceAggregatorMaxMinTest {
@Test
fun testSingleCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val forwarder = SimResourceForwarder()
@@ -58,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest {
source.startConsumer(adapter)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertAll(
@@ -66,13 +67,13 @@ internal class SimResourceAggregatorMaxMinTest {
{ assertEquals(listOf(0.0, 0.5, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testDoubleCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -86,20 +87,20 @@ internal class SimResourceAggregatorMaxMinTest {
val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
try {
- aggregator.output.consume(adapter)
+ aggregator.consume(adapter)
yield()
assertAll(
{ assertEquals(1000, clock.millis()) },
{ assertEquals(listOf(0.0, 2.0, 0.0), usage) }
)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testOvercommit() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -114,19 +115,19 @@ internal class SimResourceAggregatorMaxMinTest {
.andThen(SimResourceCommand.Exit)
try {
- aggregator.output.consume(consumer)
+ aggregator.consume(consumer)
yield()
assertEquals(1000, clock.millis())
verify(exactly = 2) { consumer.onNext(any()) }
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testException() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -141,17 +142,17 @@ internal class SimResourceAggregatorMaxMinTest {
.andThenThrows(IllegalStateException("Test Exception"))
try {
- assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
+ assertThrows<IllegalStateException> { aggregator.consume(consumer) }
yield()
assertEquals(SimResourceState.Pending, sources[0].state)
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -163,20 +164,20 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(4.0, 1.0)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(1000)
sources[0].capacity = 0.5
}
yield()
assertEquals(2334, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
}
}
@Test
fun testFailOverCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
@@ -188,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(1.0, 0.5)
try {
coroutineScope {
- launch { aggregator.output.consume(consumer) }
+ launch { aggregator.consume(consumer) }
delay(500)
sources[0].capacity = 0.5
}
yield()
assertEquals(1000, clock.millis())
} finally {
- aggregator.output.close()
+ aggregator.close()
+ }
+ }
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
+ val sources = listOf(
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
+ )
+ sources.forEach(aggregator::addInput)
+
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) }
+ .returns(SimResourceCommand.Consume(4.0, 4.0, 1000))
+ .andThen(SimResourceCommand.Exit)
+
+ try {
+ aggregator.consume(consumer)
+ yield()
+ assertEquals(1000, clock.millis())
+ assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" }
+ } finally {
+ aggregator.close()
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index 2e2d6588..6cb507ce 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -26,98 +26,109 @@ import io.mockk.*
import kotlinx.coroutines.*
import org.junit.jupiter.api.*
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.impl.SimResourceContextImpl
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
- * A test suite for the [SimAbstractResourceContext] class.
+ * A test suite for the [SimResourceContextImpl] class.
*/
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceContextTest {
@Test
fun testFlushWithoutCommand() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {}
+ val logic = object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
}
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
- context.flush(isIntermediate = false)
+ context.doUpdate(interpreter.clock.millis())
}
@Test
fun testIntermediateFlush() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onFinish() {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ val logic = spyk(object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
})
+ val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
- context.flush(isIntermediate = true)
+ context.doUpdate(interpreter.clock.millis())
- verify(exactly = 2) { context.onConsume(any(), any(), any()) }
+ verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) }
}
@Test
fun testIntermediateFlushIdle() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onFinish() {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ val logic = spyk(object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
})
+ val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic))
context.start()
delay(5)
- context.flush(isIntermediate = true)
+ context.invalidate()
delay(5)
- context.flush(isIntermediate = true)
+ context.invalidate()
assertAll(
- { verify(exactly = 2) { context.onIdle(any()) } },
- { verify(exactly = 1) { context.onFinish() } }
+ { verify(exactly = 2) { logic.onIdle(any(), any()) } },
+ { verify(exactly = 1) { logic.onFinish(any()) } }
)
}
@Test
fun testDoubleStart() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onFinish() {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ val logic = object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
}
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.start()
- assertThrows<IllegalStateException> { context.start() }
+
+ assertThrows<IllegalStateException> {
+ context.start()
+ }
}
@Test
fun testIdempodentCapacityChange() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {}
+ val logic = object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
}
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ context.capacity = 4200.0
context.start()
context.capacity = 4200.0
@@ -126,17 +137,19 @@ class SimResourceContextTest {
@Test
fun testFailureNoInfiniteLoop() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Exit
every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent")
every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure")
- val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
- override fun onIdle(deadline: Long) {}
- override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish() {}
- }
+ val logic = spyk(object : SimResourceProviderLogic {
+ override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline
+ override fun onFinish(ctx: SimResourceControllableContext) {}
+ override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline
+ })
+
+ val context = SimResourceContextImpl(null, interpreter, consumer, logic)
context.start()
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 5e86088d..08d88093 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimResourceSource] class.
@@ -40,7 +41,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
class SimResourceSourceTest {
@Test
fun testSpeed() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -63,7 +64,7 @@ class SimResourceSourceTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val provider = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -83,7 +84,7 @@ class SimResourceSourceTest {
@Test
fun testSpeedLimit() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -110,7 +111,7 @@ class SimResourceSourceTest {
*/
@Test
fun testIntermediateInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -133,7 +134,7 @@ class SimResourceSourceTest {
@Test
fun testInterrupt() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
lateinit var resCtx: SimResourceContext
@@ -174,7 +175,7 @@ class SimResourceSourceTest {
@Test
fun testFailure() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -193,7 +194,7 @@ class SimResourceSourceTest {
@Test
fun testExceptionPropagationOnNext() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -213,7 +214,7 @@ class SimResourceSourceTest {
@Test
fun testConcurrentConsumption() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -236,7 +237,7 @@ class SimResourceSourceTest {
@Test
fun testClosedConsumption() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -257,7 +258,7 @@ class SimResourceSourceTest {
@Test
fun testCloseDuringConsumption() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -279,7 +280,7 @@ class SimResourceSourceTest {
@Test
fun testIdle() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -301,7 +302,7 @@ class SimResourceSourceTest {
fun testInfiniteSleep() {
assertThrows<IllegalStateException> {
runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
@@ -321,7 +322,7 @@ class SimResourceSourceTest {
@Test
fun testIncorrectDeadline() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val capacity = 4200.0
val provider = SimResourceSource(capacity, scheduler)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index 32b6d8ad..ad8d82e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceSwitchExclusive] class.
@@ -44,7 +45,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val speed = mutableListOf<Double>()
@@ -66,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest {
source.startConsumer(adapter)
switch.addInput(forwarder)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -86,7 +87,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testRuntimeWorkload() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -97,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -113,7 +114,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTwoWorkloads() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = object : SimResourceConsumer {
@@ -141,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- val provider = switch.addOutput(3200.0)
+ val provider = switch.newOutput()
try {
provider.consume(workload)
@@ -158,7 +159,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -169,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest {
switch.addInput(source)
- switch.addOutput(3200.0)
- assertThrows<IllegalStateException> { switch.addOutput(3200.0) }
+ switch.newOutput()
+ assertThrows<IllegalStateException> { switch.newOutput() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index e7dec172..e4292ec0 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimTraceConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* Test suite for the [SimResourceSwitch] implementations
@@ -40,13 +41,13 @@ import org.opendc.simulator.resources.consumer.SimTraceConsumer
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val switch = SimResourceSwitchMaxMin(scheduler)
val sources = List(2) { SimResourceSource(2000.0, scheduler) }
sources.forEach { switch.addInput(it) }
- val provider = switch.addOutput(1000.0)
+ val provider = switch.newOutput()
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit
@@ -64,27 +65,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
-
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L
val workload =
@@ -97,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(scheduler, listener)
- val provider = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val provider = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -109,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest {
}
assertAll(
- { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
+ { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -121,27 +102,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
-
- val listener = object : SimResourceSwitchMaxMin.Listener {
- var totalRequestedWork = 0L
- var totalGrantedWork = 0L
- var totalOvercommittedWork = 0L
-
- override fun onSliceFinish(
- switch: SimResourceSwitchMaxMin,
- requestedWork: Long,
- grantedWork: Long,
- overcommittedWork: Long,
- interferedWork: Long,
- cpuUsage: Double,
- cpuDemand: Double
- ) {
- totalRequestedWork += requestedWork
- totalGrantedWork += grantedWork
- totalOvercommittedWork += overcommittedWork
- }
- }
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val duration = 5 * 60L
val workloadA =
@@ -163,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(scheduler, listener)
- val providerA = switch.addOutput(3200.0)
- val providerB = switch.addOutput(3200.0)
+ val switch = SimResourceSwitchMaxMin(scheduler)
+ val providerA = switch.newOutput()
+ val providerB = switch.newOutput()
try {
switch.addInput(SimResourceSource(3200.0, scheduler))
@@ -180,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest {
switch.close()
}
assertAll(
- { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
- { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") },
- { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") },
+ { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
+ { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index 880e1755..810052b8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimResourceTransformer] class.
@@ -41,7 +42,7 @@ internal class SimResourceTransformerTest {
@Test
fun testExitImmediately() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
launch {
@@ -61,7 +62,7 @@ internal class SimResourceTransformerTest {
@Test
fun testExit() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
launch {
@@ -122,7 +123,7 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelStartedDelegate() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -141,7 +142,7 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -160,7 +161,7 @@ internal class SimResourceTransformerTest {
@Test
fun testExitPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder(isCoupled = true)
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
@@ -176,7 +177,7 @@ internal class SimResourceTransformerTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -195,7 +196,7 @@ internal class SimResourceTransformerTest {
@Test
fun testTransformExit() = runBlockingSimulation {
val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -205,4 +206,21 @@ internal class SimResourceTransformerTest {
assertEquals(0, clock.millis())
verify(exactly = 1) { consumer.onNext(any()) }
}
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val forwarder = SimResourceForwarder()
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
+
+ val consumer = SimWorkConsumer(2.0, 1.0)
+ source.startConsumer(forwarder)
+
+ forwarder.consume(consumer)
+
+ assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
+ assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
+ assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(2000, clock.millis())
+ }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
index ac8b5814..db4fe856 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
+import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
/**
* A test suite for the [SimWorkConsumer] class.
@@ -35,7 +36,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer
internal class SimWorkConsumerTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 1.0)
@@ -50,7 +51,7 @@ internal class SimWorkConsumerTest {
@Test
fun testUtilization() = runBlockingSimulation {
- val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val scheduler = SimResourceInterpreterImpl(coroutineContext, clock)
val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 0.5)