summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-05-03 22:32:32 +0200
committerGitHub <noreply@github.com>2021-05-03 22:32:32 +0200
commit1ce1210be893e7333dfa09266f6990af87c98dc2 (patch)
tree3bfdc735ef39353c6c715399c2d9890ff423d4c4 /opendc-simulator
parent17ffe995ee06d5755cd3943a5ea14f982884009e (diff)
parent80335a49513f3e74228aa1bfb998dd54855f68e2 (diff)
simulator: Add support for central resource scheduling (#126)
This pull request adds support for central resource scheduling. This enables possible optimizations in the future where we can efficiently schedule resource updates. * Introduce `SimResourceScheduler` which centralizes the logic for scheduling resource interrupts. * Fix benchmarks * Add generic approach for reporting resource events * Simplify scheduling logic of resource aggregator. **Breaking API Changes** * Classes in `opendc-simulator-resources` now take `SimResourceScheduler` as opposed to a `CoroutineContext` and `Clock`.
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt40
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt57
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt180
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt76
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt42
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt35
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt55
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt (renamed from opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt)37
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt (renamed from opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/BenchmarkHelpers.kt)28
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt36
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt32
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt51
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt63
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt86
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt24
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt42
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt9
32 files changed, 716 insertions, 484 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index 7b97a665..15714aca 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -31,12 +31,12 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
-import org.opendc.simulator.compute.workload.SimWorkload
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.utils.TimerScheduler
+import org.opendc.simulator.resources.SimResourceScheduler
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
import org.openjdk.jmh.annotations.*
-import java.time.Clock
import java.util.concurrent.TimeUnit
@State(Scope.Thread)
@@ -46,14 +46,13 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var clock: Clock
- private lateinit var scheduler: TimerScheduler<Any>
+ private lateinit var scheduler: SimResourceScheduler
private lateinit var machineModel: SimMachineModel
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = TimerScheduler(scope.coroutineContext, clock)
+ scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
@@ -63,13 +62,22 @@ class SimMachineBenchmarks {
)
}
- @State(Scope.Thread)
+ @State(Scope.Benchmark)
class Workload {
- lateinit var workloads: Array<SimWorkload>
+ lateinit var trace: Sequence<SimTraceWorkload.Fragment>
@Setup
fun setUp() {
- workloads = Array(2) { createSimpleConsumer() }
+ trace = sequenceOf(
+ SimTraceWorkload.Fragment(1000, 28.0, 1),
+ SimTraceWorkload.Fragment(1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(1000, 0.0, 1),
+ SimTraceWorkload.Fragment(1000, 183.0, 1),
+ SimTraceWorkload.Fragment(1000, 400.0, 1),
+ SimTraceWorkload.Fragment(1000, 100.0, 1),
+ SimTraceWorkload.Fragment(1000, 3000.0, 1),
+ SimTraceWorkload.Fragment(1000, 4500.0, 1),
+ )
}
}
@@ -80,7 +88,7 @@ class SimMachineBenchmarks {
coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- return@runBlockingSimulation machine.run(state.workloads[0])
+ return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace))
}
}
@@ -98,7 +106,7 @@ class SimMachineBenchmarks {
val vm = hypervisor.createMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(state.workloads[0])
+ return@runBlockingSimulation vm.run(SimTraceWorkload(state.trace))
} finally {
vm.close()
machine.close()
@@ -113,14 +121,14 @@ class SimMachineBenchmarks {
coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor()
+ val hypervisor = SimFairShareHypervisor(scheduler)
launch { machine.run(hypervisor) }
val vm = hypervisor.createMachine(machineModel)
try {
- return@runBlockingSimulation vm.run(state.workloads[0])
+ return@runBlockingSimulation vm.run(SimTraceWorkload(state.trace))
} finally {
vm.close()
machine.close()
@@ -135,17 +143,17 @@ class SimMachineBenchmarks {
coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor()
+ val hypervisor = SimFairShareHypervisor(scheduler)
launch { machine.run(hypervisor) }
coroutineScope {
- repeat(2) { i ->
+ repeat(2) {
val vm = hypervisor.createMachine(machineModel)
launch {
try {
- vm.run(state.workloads[i])
+ vm.run(SimTraceWorkload(state.trace))
} finally {
machine.close()
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 09ee601e..27ebba21 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
import org.opendc.simulator.compute.cpufreq.ScalingDriver
import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.ProcessingUnit
@@ -60,7 +59,7 @@ public class SimBareMetalMachine(
/**
* The [TimerScheduler] to use for scheduling the interrupts.
*/
- private val scheduler = TimerScheduler<Any>(this.context, clock)
+ private val scheduler = SimResourceSchedulerTrampoline(this.context, clock)
override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
@@ -96,7 +95,6 @@ public class SimBareMetalMachine(
override fun close() {
super.close()
- scheduler.close()
scope.cancel()
}
@@ -107,7 +105,7 @@ public class SimBareMetalMachine(
/**
* The actual resource supporting the processing unit.
*/
- private val source = SimResourceSource(model.frequency, clock, scheduler)
+ private val source = SimResourceSource(model.frequency, scheduler)
override val speed: Double
get() = source.speed
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index fa677de9..11aec2de 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -31,13 +31,13 @@ import org.opendc.simulator.resources.*
*
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
+public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
return SimResourceSwitchMaxMin(
- ctx.clock,
+ scheduler,
object : SimResourceSwitchMaxMin.Listener {
override fun onSliceFinish(
switch: SimResourceSwitchMaxMin,
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 02eb6ad0..2ab3ea09 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -22,11 +22,17 @@
package org.opendc.simulator.compute
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
*/
public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
- override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimFairShareHypervisor(listener)
+ override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
+ return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener)
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
index a5b4526b..b66020f4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -22,6 +22,9 @@
package org.opendc.simulator.compute
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A service provider interface for constructing a [SimHypervisor].
*/
@@ -37,5 +40,5 @@ public interface SimHypervisorProvider {
/**
* Create a [SimHypervisor] instance with the specified [listener].
*/
- public fun create(listener: SimHypervisor.Listener? = null): SimHypervisor
+ public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index e2044d05..83b924d7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -22,11 +22,16 @@
package org.opendc.simulator.compute
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
*/
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor()
+ override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
+ return SimSpaceSharedHypervisor()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index a067dd2e..8886caa7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -39,6 +39,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
/**
* Test suite for the [SimHypervisor] class.
@@ -93,7 +94,7 @@ internal class SimHypervisorTest {
)
val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(listener)
+ val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
launch {
machine.run(hypervisor)
@@ -167,7 +168,7 @@ internal class SimHypervisorTest {
coroutineContext, clock, model, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(listener)
+ val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
launch {
machine.run(hypervisor)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index beda3eaa..cd5f33bd 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -26,7 +26,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.utils.TimerScheduler
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
@@ -37,67 +37,76 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: TimerScheduler<Any>
+ private lateinit var scheduler: SimResourceScheduler
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = TimerScheduler(scope.coroutineContext, scope.clock)
+ scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
}
@State(Scope.Thread)
class Workload {
- lateinit var consumers: Array<SimResourceConsumer>
+ lateinit var trace: Sequence<SimTraceConsumer.Fragment>
@Setup
fun setUp() {
- consumers = Array(3) { createSimpleConsumer() }
+ trace = sequenceOf(
+ SimTraceConsumer.Fragment(1000, 28.0),
+ SimTraceConsumer.Fragment(1000, 3500.0),
+ SimTraceConsumer.Fragment(1000, 0.0),
+ SimTraceConsumer.Fragment(1000, 183.0),
+ SimTraceConsumer.Fragment(1000, 400.0),
+ SimTraceConsumer.Fragment(1000, 100.0),
+ SimTraceConsumer.Fragment(1000, 3000.0),
+ SimTraceConsumer.Fragment(1000, 4500.0),
+ )
}
}
@Benchmark
fun benchmarkSource(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, clock, scheduler)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ val provider = SimResourceSource(4200.0, scheduler)
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, clock, scheduler)
+ val provider = SimResourceSource(4200.0, scheduler)
val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
- return@runBlockingSimulation forwarder.consume(state.consumers[0])
+ return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
val provider = switch.addOutput(3500.0)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
repeat(3) { i ->
launch {
val provider = switch.addOutput(3500.0)
- provider.consume(state.consumers[i])
+ provider.consume(SimTraceConsumer(state.trace))
}
}
}
@@ -108,11 +117,11 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
val provider = switch.addOutput(3500.0)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -121,13 +130,13 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
- repeat(2) { i ->
+ repeat(2) {
launch {
val provider = switch.addOutput(3500.0)
- provider.consume(state.consumers[i])
+ provider.consume(SimTraceConsumer(state.trace))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index c7fa6a17..6ae04f27 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -22,30 +22,10 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* Abstract implementation of [SimResourceAggregator].
*/
-public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
- /**
- * The available resource provider contexts.
- */
- protected val inputContexts: Set<SimResourceContext>
- get() = _inputContexts
- private val _inputContexts = mutableSetOf<SimResourceContext>()
-
- /**
- * The output context.
- */
- protected val outputContext: SimResourceContext
- get() = context
-
- /**
- * The commands to submit to the underlying input resources.
- */
- protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf()
-
+public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator {
/**
* This method is invoked when the resource consumer consumes resources.
*/
@@ -54,37 +34,29 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
/**
* This method is invoked when the resource consumer enters an idle state.
*/
- protected open fun doIdle(deadline: Long) {
- for (input in inputContexts) {
- commands[input] = SimResourceCommand.Idle(deadline)
- }
- }
+ protected abstract fun doIdle(deadline: Long)
/**
* This method is invoked when the resource consumer finishes processing.
*/
- protected open fun doFinish(cause: Throwable?) {
- for (input in inputContexts) {
- commands[input] = SimResourceCommand.Exit
- }
- }
+ protected abstract fun doFinish(cause: Throwable?)
/**
* This method is invoked when an input context is started.
*/
- protected open fun onContextStarted(ctx: SimResourceContext) {
- _inputContexts.add(ctx)
- }
+ protected abstract fun onInputStarted(input: Input)
- protected open fun onContextFinished(ctx: SimResourceContext) {
- assert(_inputContexts.remove(ctx)) { "Lost context" }
- }
+ /**
+ * This method is invoked when an input is stopped.
+ */
+ protected abstract fun onInputFinished(input: Input)
override fun addInput(input: SimResourceProvider) {
check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
val consumer = Consumer()
_inputs.add(input)
+ _inputConsumers.add(consumer)
input.startConsumer(consumer)
}
@@ -99,15 +71,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
override val inputs: Set<SimResourceProvider>
get() = _inputs
private val _inputs = mutableSetOf<SimResourceProvider>()
+ private val _inputConsumers = mutableListOf<Consumer>()
- private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
+ protected val outputContext: SimResourceContext
+ get() = context
+ private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) {
override val remainingWork: Double
get() {
val now = clock.millis()
return if (_remainingWorkFlush < now) {
_remainingWorkFlush = now
- _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it }
+ _inputConsumers.sumByDouble { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it }
} else {
_remainingWork
}
@@ -115,94 +90,89 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
private var _remainingWork: Double = 0.0
private var _remainingWorkFlush: Long = Long.MIN_VALUE
- override fun interrupt() {
- super.interrupt()
-
- interruptAll()
- }
-
override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
override fun onIdle(deadline: Long) = doIdle(deadline)
- override fun onFinish(cause: Throwable?) {
- doFinish(cause)
-
- super.onFinish(cause)
-
- interruptAll()
+ override fun onFinish() {
+ doFinish(null)
}
}
/**
- * A flag to indicate that an interrupt is active.
- */
- private var isInterrupting: Boolean = false
-
- /**
- * Schedule the work over the input resources.
+ * An input for the resource aggregator.
*/
- private fun doSchedule() {
- context.flush(isIntermediate = true)
- interruptAll()
+ public interface Input {
+ /**
+ * The [SimResourceContext] associated with the input.
+ */
+ public val ctx: SimResourceContext
+
+ /**
+ * Push the specified [SimResourceCommand] to the input.
+ */
+ public fun push(command: SimResourceCommand)
}
/**
- * Interrupt all inputs.
+ * An internal [SimResourceConsumer] implementation for aggregator inputs.
*/
- private fun interruptAll() {
- // Prevent users from interrupting the resource while they are constructing their next command, as this will
- // only lead to infinite recursion.
- if (isInterrupting) {
- return
- }
-
- try {
- isInterrupting = true
-
- val iterator = _inputs.iterator()
- while (iterator.hasNext()) {
- val input = iterator.next()
- input.interrupt()
-
- if (input.state != SimResourceState.Active) {
- iterator.remove()
- }
- }
- } finally {
- isInterrupting = false
+ private inner class Consumer : Input, SimResourceConsumer {
+ /**
+ * The resource context associated with the input.
+ */
+ override val ctx: SimResourceContext
+ get() = _ctx!!
+ var _ctx: SimResourceContext? = null
+
+ /**
+ * The resource command to run next.
+ */
+ private var command: SimResourceCommand? = null
+
+ private fun updateCapacity() {
+ // Adjust capacity of output resource
+ context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 }
}
- }
- /**
- * An internal [SimResourceConsumer] implementation for aggregator inputs.
- */
- private inner class Consumer : SimResourceConsumer {
- override fun onStart(ctx: SimResourceContext) {
- onContextStarted(ctx)
- onCapacityChanged(ctx, false)
-
- // Make sure we initialize the output if we have not done so yet
- if (context.state == SimResourceState.Pending) {
- context.start()
- }
+ /* Input */
+ override fun push(command: SimResourceCommand) {
+ this.command = command
+ _ctx?.interrupt()
}
+ /* SimResourceConsumer */
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- doSchedule()
-
- return commands[ctx] ?: SimResourceCommand.Idle()
+ var next = command
+
+ return if (next != null) {
+ this.command = null
+ next
+ } else {
+ context.flush(isIntermediate = true)
+ next = command
+ this.command = null
+ next ?: SimResourceCommand.Idle()
+ }
}
- override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
- // Adjust capacity of output resource
- context.capacity = inputContexts.sumByDouble { it.capacity }
- }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ _ctx = ctx
+ updateCapacity()
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- onContextFinished(ctx)
+ // Make sure we initialize the output if we have not done so yet
+ if (context.state == SimResourceState.Pending) {
+ context.start()
+ }
- super.onFinish(ctx, cause)
+ onInputStarted(this)
+ }
+ SimResourceEvent.Capacity -> updateCapacity()
+ SimResourceEvent.Exit -> onInputFinished(this)
+ else -> {}
+ }
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 05ed0714..c03bfad5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -31,9 +31,16 @@ import kotlin.math.min
*/
public abstract class SimAbstractResourceContext(
initialCapacity: Double,
- override val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val consumer: SimResourceConsumer
-) : SimResourceContext {
+) : SimResourceContext, SimResourceFlushable {
+
+ /**
+ * The clock of the context.
+ */
+ public override val clock: Clock
+ get() = scheduler.clock
+
/**
* The capacity of the resource.
*/
@@ -75,7 +82,7 @@ public abstract class SimAbstractResourceContext(
/**
* The current processing speed of the resource.
*/
- public var speed: Double = 0.0
+ final override var speed: Double = 0.0
private set
/**
@@ -92,9 +99,7 @@ public abstract class SimAbstractResourceContext(
/**
* This method is invoked when the resource consumer has finished.
*/
- public open fun onFinish(cause: Throwable?) {
- consumer.onFinish(this, cause)
- }
+ public abstract fun onFinish()
/**
* Get the remaining work to process after a resource consumption.
@@ -126,10 +131,10 @@ public abstract class SimAbstractResourceContext(
latestFlush = now
try {
- consumer.onStart(this)
+ consumer.onEvent(this, SimResourceEvent.Start)
activeCommand = interpret(consumer.onNext(this), now)
} catch (cause: Throwable) {
- doStop(cause)
+ doFail(cause)
} finally {
isProcessing = false
}
@@ -144,22 +149,13 @@ public abstract class SimAbstractResourceContext(
latestFlush = clock.millis()
flush(isIntermediate = true)
- doStop(null)
- } catch (cause: Throwable) {
- doStop(cause)
+ doStop()
} finally {
isProcessing = false
}
}
- /**
- * Flush the current active resource consumption.
- *
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun flush(isIntermediate: Boolean = false) {
+ override fun flush(isIntermediate: Boolean) {
// Flush is no-op when the consumer is finished or not yet started
if (state != SimResourceState.Active) {
return
@@ -214,7 +210,7 @@ public abstract class SimAbstractResourceContext(
// Flush remaining work cache
_remainingWorkFlush = Long.MIN_VALUE
} catch (cause: Throwable) {
- doStop(cause)
+ doFail(cause)
} finally {
latestFlush = now
isProcessing = false
@@ -228,7 +224,7 @@ public abstract class SimAbstractResourceContext(
return
}
- flush()
+ scheduler.schedule(this, isIntermediate = false)
}
override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
@@ -236,7 +232,7 @@ public abstract class SimAbstractResourceContext(
/**
* A flag to indicate that the resource is currently processing a command.
*/
- protected var isProcessing: Boolean = false
+ private var isProcessing: Boolean = false
/**
* The current command that is being processed.
@@ -251,13 +247,18 @@ public abstract class SimAbstractResourceContext(
/**
* Finish the consumer and resource provider.
*/
- private fun doStop(cause: Throwable?) {
+ private fun doStop() {
val state = state
this.state = SimResourceState.Stopped
if (state == SimResourceState.Active) {
activeCommand = null
- onFinish(cause)
+ try {
+ consumer.onEvent(this, SimResourceEvent.Exit)
+ onFinish()
+ } catch (cause: Throwable) {
+ doFail(cause)
+ }
}
}
@@ -272,9 +273,9 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = 0.0
- consumer.onConfirm(this, 0.0)
onIdle(deadline)
+ consumer.onEvent(this, SimResourceEvent.Run)
}
is SimResourceCommand.Consume -> {
val work = command.work
@@ -284,14 +285,13 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = min(capacity, limit)
- consumer.onConfirm(this, speed)
-
onConsume(work, limit, deadline)
+ consumer.onEvent(this, SimResourceEvent.Run)
}
is SimResourceCommand.Exit -> {
speed = 0.0
- doStop(null)
+ doStop()
// No need to set the next active command
return null
@@ -319,6 +319,23 @@ public abstract class SimAbstractResourceContext(
}
/**
+ * Fail the resource consumer.
+ */
+ private fun doFail(cause: Throwable) {
+ state = SimResourceState.Stopped
+ activeCommand = null
+
+ try {
+ consumer.onFailure(this, cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ e.printStackTrace()
+ }
+
+ onFinish()
+ }
+
+ /**
* Indicate that the capacity of the resource has changed.
*/
private fun onCapacityChange() {
@@ -328,7 +345,8 @@ public abstract class SimAbstractResourceContext(
}
val isThrottled = speed > capacity
- consumer.onCapacityChanged(this, isThrottled)
+
+ consumer.onEvent(this, SimResourceEvent.Capacity)
// Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
// Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 08bc064e..5665abd1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -22,42 +22,50 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* A [SimResourceAggregator] that distributes the load equally across the input resources.
*/
-public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
- private val consumers = mutableListOf<SimResourceContext>()
+public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) {
+ private val consumers = mutableListOf<Input>()
override fun doConsume(work: Double, limit: Double, deadline: Long) {
// Sort all consumers by their capacity
- consumers.sortWith(compareBy { it.capacity })
+ consumers.sortWith(compareBy { it.ctx.capacity })
// Divide the requests over the available capacity of the input resources fairly
for (input in consumers) {
- val inputCapacity = input.capacity
+ val inputCapacity = input.ctx.capacity
val fraction = inputCapacity / outputContext.capacity
val grantedSpeed = limit * fraction
val grantedWork = fraction * work
- commands[input] =
- if (grantedWork > 0.0 && grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ val command = if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ input.push(command)
}
}
- override fun onContextStarted(ctx: SimResourceContext) {
- super.onContextStarted(ctx)
+ override fun doIdle(deadline: Long) {
+ for (input in consumers) {
+ input.push(SimResourceCommand.Idle(deadline))
+ }
+ }
- consumers.add(ctx)
+ override fun doFinish(cause: Throwable?) {
+ val iterator = consumers.iterator()
+ for (input in iterator) {
+ iterator.remove()
+ input.push(SimResourceCommand.Exit)
+ }
}
- override fun onContextFinished(ctx: SimResourceContext) {
- super.onContextFinished(ctx)
+ override fun onInputStarted(input: Input) {
+ consumers.add(input)
+ }
- consumers.remove(ctx)
+ override fun onInputFinished(input: Input) {
+ consumers.remove(input)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 38672b13..4d937514 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -30,13 +30,6 @@ package org.opendc.simulator.resources
*/
public interface SimResourceConsumer {
/**
- * This method is invoked when the consumer is started for some resource.
- *
- * @param ctx The execution context in which the consumer runs.
- */
- public fun onStart(ctx: SimResourceContext) {}
-
- /**
* This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
* the resource finished processing, reached its deadline or was interrupted.
*
@@ -46,34 +39,18 @@ public interface SimResourceConsumer {
public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
- * This method is invoked when the resource provider confirms that the consumer is running at the given speed.
+ * This method is invoked when an event has occurred.
*
* @param ctx The execution context in which the consumer runs.
- * @param speed The speed at which the consumer runs.
+ * @param event The event that has occurred.
*/
- public fun onConfirm(ctx: SimResourceContext, speed: Double) {}
+ public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {}
/**
- * This is method is invoked when the capacity of the resource changes.
- *
- * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
- * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly
- * causing the active resource command to finish at a later moment than initially planned.
+ * This method is invoked when a resource consumer throws an exception.
*
* @param ctx The execution context in which the consumer runs.
- * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the
- * capacity change.
- */
- public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {}
-
- /**
- * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
- * the resource finished itself, or a failure occurred at the resource.
- *
- * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider.
- *
- * @param ctx The execution context in which the consumer ran.
- * @param cause The cause of the finish in case the resource finished exceptionally.
+ * @param cause The cause of the failure.
*/
- public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {}
+ public fun onFailure(ctx: SimResourceContext, cause: Throwable) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index 11dbb09f..7c76c634 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -40,6 +40,11 @@ public interface SimResourceContext {
public val capacity: Double
/**
+ * The resource processing speed at this instant.
+ */
+ public val speed: Double
+
+ /**
* The amount of work still remaining at this instant.
*/
public val remainingWork: Double
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index dfdd2c2e..a76cb1e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources
-import java.time.Clock
import kotlin.math.max
import kotlin.math.min
@@ -31,7 +30,7 @@ import kotlin.math.min
*/
public class SimResourceDistributorMaxMin(
override val input: SimResourceProvider,
- private val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
@@ -90,26 +89,28 @@ public class SimResourceDistributorMaxMin(
val remainingWork: Double
get() = ctx.remainingWork
- override fun onStart(ctx: SimResourceContext) {
- this.ctx = ctx
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
return doNext(ctx.capacity)
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- super.onFinish(ctx, cause)
-
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ }
+ SimResourceEvent.Exit -> {
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
- output.close()
+ output.close()
+ }
+ }
+ else -> {}
}
}
}
@@ -218,7 +219,7 @@ public class SimResourceDistributorMaxMin(
}
}
- assert(deadline >= clock.millis()) { "Deadline already passed" }
+ assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" }
this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
@@ -335,7 +336,7 @@ public class SimResourceDistributorMaxMin(
private inner class OutputContext(
private val provider: OutputProvider,
consumer: SimResourceConsumer
- ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
+ ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> {
/**
* The current command that is processed by the vCPU.
*/
@@ -370,13 +371,11 @@ public class SimResourceDistributorMaxMin(
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
}
- override fun onFinish(cause: Throwable?) {
+ override fun onFinish() {
reportOvercommit()
activeCommand = SimResourceCommand.Exit
provider.cancel()
-
- super.onFinish(cause)
}
override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
@@ -402,6 +401,8 @@ public class SimResourceDistributorMaxMin(
}
}
+ private var isProcessing: Boolean = false
+
override fun interrupt() {
// Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
// to infinite recursion.
@@ -409,10 +410,16 @@ public class SimResourceDistributorMaxMin(
return
}
- super.interrupt()
+ try {
+ isProcessing = false
- // Force the scheduler to re-schedule
- schedule()
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ } finally {
+ isProcessing = true
+ }
}
override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt
index 8d2587b1..959427f1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt
@@ -22,22 +22,27 @@
package org.opendc.simulator.resources
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
-
/**
- * Helper function to create simple consumer workload.
+ * A resource event that is communicated to the resource consumer.
*/
-fun createSimpleConsumer(): SimResourceConsumer {
- return SimTraceConsumer(
- sequenceOf(
- SimTraceConsumer.Fragment(1000, 28.0),
- SimTraceConsumer.Fragment(1000, 3500.0),
- SimTraceConsumer.Fragment(1000, 0.0),
- SimTraceConsumer.Fragment(1000, 183.0),
- SimTraceConsumer.Fragment(1000, 400.0),
- SimTraceConsumer.Fragment(1000, 100.0),
- SimTraceConsumer.Fragment(1000, 3000.0),
- SimTraceConsumer.Fragment(1000, 4500.0),
- ),
- )
+public enum class SimResourceEvent {
+ /**
+ * This event is emitted to the consumer when it has started.
+ */
+ Start,
+
+ /**
+ * This event is emitted to the consumer when it has exited.
+ */
+ Exit,
+
+ /**
+ * This event is emitted to the consumer when it has started a new resource consumption or idle cycle.
+ */
+ Run,
+
+ /**
+ * This event is emitted to the consumer when the capacity of the resource has changed.
+ */
+ Capacity,
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
index 43bbfd0b..f6a1a42e 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/BenchmarkHelpers.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
@@ -20,24 +20,18 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute
-
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+package org.opendc.simulator.resources
/**
- * Helper function to create simple consumer workload.
+ * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer.
*/
-fun createSimpleConsumer(): SimTraceWorkload {
- return SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(1000, 28.0, 1),
- SimTraceWorkload.Fragment(1000, 3500.0, 1),
- SimTraceWorkload.Fragment(1000, 0.0, 1),
- SimTraceWorkload.Fragment(1000, 183.0, 1),
- SimTraceWorkload.Fragment(1000, 400.0, 1),
- SimTraceWorkload.Fragment(1000, 100.0, 1),
- SimTraceWorkload.Fragment(1000, 3000.0, 1),
- SimTraceWorkload.Fragment(1000, 4500.0, 1),
- ),
- )
+public interface SimResourceFlushable {
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun flush(isIntermediate: Boolean)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 52b13c5c..2f567a5e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -23,6 +23,8 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
/**
* A [SimResourceProvider] provides some resource of type [R].
@@ -65,15 +67,27 @@ public interface SimResourceProvider : AutoCloseable {
public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
return suspendCancellableCoroutine { cont ->
startConsumer(object : SimResourceConsumer by consumer {
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- assert(!cont.isCompleted) { "Coroutine already completed" }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ consumer.onEvent(ctx, event)
- consumer.onFinish(ctx, cause)
+ if (event == SimResourceEvent.Exit && !cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ }
- cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ try {
+ consumer.onFailure(ctx, cause)
+ cont.resumeWithException(cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ cont.resumeWithException(e)
+ }
}
override fun toString(): String = "SimSuspendingResourceConsumer"
})
+
+ cont.invokeOnCancellation { cancel() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
new file mode 100644
index 00000000..a228c47b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource
+ * providers and consumers.
+ *
+ * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more
+ * efficiently, reducing the overall work needed per update.
+ */
+public interface SimResourceScheduler {
+ /**
+ * The [Clock] associated with this scheduler.
+ */
+ public val clock: Clock
+
+ /**
+ * Schedule a direct interrupt for the resource context represented by [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false)
+
+ /**
+ * Schedule an interrupt in the future for the resource context represented by [flushable].
+ *
+ * This method will override earlier calls to this method for the same [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param timestamp The timestamp when the interrupt should happen.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false)
+
+ /**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+ public fun batch(block: () -> Unit)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
new file mode 100644
index 00000000..cdbb4a6c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import java.util.ArrayDeque
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after.
+ *
+ * @param clock The virtual simulation clock.
+ */
+public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler {
+ /**
+ * The [TimerScheduler] to actually schedule the interrupts.
+ */
+ private val timers = TimerScheduler<Any>(context, clock)
+
+ /**
+ * A flag to indicate that an interrupt is currently running already.
+ */
+ private var isRunning: Boolean = false
+
+ /**
+ * The queue of resources to be flushed.
+ */
+ private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>()
+
+ override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) {
+ queue.add(flushable to isIntermediate)
+
+ if (isRunning) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) {
+ timers.startSingleTimerTo(flushable, timestamp) {
+ schedule(flushable, isIntermediate)
+ }
+ }
+
+ override fun batch(block: () -> Unit) {
+ val wasAlreadyRunning = isRunning
+ try {
+ isRunning = true
+ block()
+ } finally {
+ if (!wasAlreadyRunning) {
+ isRunning = false
+ }
+ }
+ }
+
+ /**
+ * Flush the scheduled queue.
+ */
+ private fun flush() {
+ val visited = mutableSetOf<SimResourceFlushable>()
+ try {
+ isRunning = true
+ while (queue.isNotEmpty()) {
+ val (flushable, isIntermediate) = queue.poll()
+ flushable.flush(isIntermediate)
+ visited.add(flushable)
+ }
+ } finally {
+ isRunning = false
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 025b0406..3277b889 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
import kotlin.math.ceil
import kotlin.math.min
@@ -31,13 +29,11 @@ import kotlin.math.min
* A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param clock The virtual clock to track simulation time.
* @param scheduler The scheduler to schedule the interrupts.
*/
public class SimResourceSource(
initialCapacity: Double,
- private val clock: Clock,
- private val scheduler: TimerScheduler<Any>
+ private val scheduler: SimResourceScheduler
) : SimResourceProvider {
/**
* The current processing speed of the resource.
@@ -96,25 +92,27 @@ public class SimResourceSource(
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) {
override fun onIdle(deadline: Long) {
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
- scheduler.startSingleTimerTo(this, deadline) { flush() }
+ scheduler.schedule(this, deadline)
}
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
val until = min(deadline, clock.millis() + getDuration(work, speed))
-
- scheduler.startSingleTimerTo(this, until, ::flush)
+ scheduler.schedule(this, until)
}
- override fun onFinish(cause: Throwable?) {
- scheduler.cancel(this)
+ override fun onFinish() {
cancel()
- super.onFinish(cause)
+ ctx = null
+
+ if (this@SimResourceSource.state != SimResourceState.Stopped) {
+ this@SimResourceSource.state = SimResourceState.Pending
+ }
}
override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 45e4c220..1a9dd0bc 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -66,10 +66,13 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
availableResources += forwarder
input.startConsumer(object : SimResourceConsumer by forwarder {
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- // De-register the input after it has finished
- _inputs -= input
- forwarder.onFinish(ctx, cause)
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ if (event == SimResourceEvent.Exit) {
+ // De-register the input after it has finished
+ _inputs -= input
+ }
+
+ forwarder.onEvent(ctx, event)
}
})
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index c796c251..5dc1e68d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -23,14 +23,13 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.*
-import java.time.Clock
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
public class SimResourceSwitchMaxMin(
- clock: Clock,
+ scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceSwitch {
private val _outputs = mutableSetOf<SimResourceProvider>()
@@ -49,13 +48,13 @@ public class SimResourceSwitchMaxMin(
/**
* The aggregator to aggregate the resources.
*/
- private val aggregator = SimResourceAggregatorMaxMin(clock)
+ private val aggregator = SimResourceAggregatorMaxMin(scheduler)
/**
* The distributor to distribute the aggregated resources.
*/
private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, clock,
+ aggregator.output, scheduler,
object : SimResourceDistributorMaxMin.Listener {
override fun onSliceFinish(
switch: SimResourceDistributor,
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index de455021..32f3f573 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -75,7 +75,7 @@ public class SimResourceTransformer(
if (delegate != null && ctx != null) {
this.delegate = null
- delegate.onFinish(ctx)
+ delegate.onEvent(ctx, SimResourceEvent.Exit)
}
}
@@ -90,10 +90,6 @@ public class SimResourceTransformer(
}
}
- override fun onStart(ctx: SimResourceContext) {
- this.ctx = ctx
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val delegate = delegate
@@ -110,7 +106,7 @@ public class SimResourceTransformer(
// reset beforehand the existing state and check whether it has been updated afterwards
reset()
- delegate.onFinish(ctx)
+ delegate.onEvent(ctx, SimResourceEvent.Exit)
if (isCoupled || state == SimResourceState.Stopped)
SimResourceCommand.Exit
@@ -124,21 +120,31 @@ public class SimResourceTransformer(
}
}
- override fun onConfirm(ctx: SimResourceContext, speed: Double) {
- delegate?.onConfirm(ctx, speed)
- }
-
- override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
- delegate?.onCapacityChanged(ctx, isThrottled)
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ }
+ SimResourceEvent.Exit -> {
+ this.ctx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+ delegate.onEvent(ctx, SimResourceEvent.Exit)
+ }
+ }
+ else -> delegate?.onEvent(ctx, event)
+ }
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
this.ctx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onFinish(ctx, cause)
+ delegate.onFailure(ctx, cause)
}
}
@@ -147,7 +153,7 @@ public class SimResourceTransformer(
*/
private fun start() {
val delegate = delegate ?: return
- delegate.onStart(checkNotNull(ctx))
+ delegate.onEvent(checkNotNull(ctx), SimResourceEvent.Start)
hasDelegateStarted = true
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index 114c7312..4f4ebb14 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.SimResourceEvent
import kotlin.math.min
/**
@@ -53,28 +54,29 @@ public class SimSpeedConsumerAdapter(
return delegate.onNext(ctx)
}
- override fun onConfirm(ctx: SimResourceContext, speed: Double) {
- delegate.onConfirm(ctx, speed)
-
- this.speed = speed
- }
-
- override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
val oldSpeed = speed
- delegate.onCapacityChanged(ctx, isThrottled)
+ delegate.onEvent(ctx, event)
- // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
- // need to update the current speed.
- if (oldSpeed == speed) {
- speed = min(ctx.capacity, speed)
+ when (event) {
+ SimResourceEvent.Run -> speed = ctx.speed
+ SimResourceEvent.Capacity -> {
+ // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
+ // need to update the current speed.
+ if (oldSpeed == speed) {
+ speed = min(ctx.capacity, speed)
+ }
+ }
+ SimResourceEvent.Exit -> speed = 0.0
+ else -> {}
}
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- super.onFinish(ctx, cause)
-
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
speed = 0.0
+
+ delegate.onFailure(ctx, cause)
}
override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index a52d1d5d..2e94e1c1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.SimResourceEvent
/**
* A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -33,11 +34,6 @@ import org.opendc.simulator.resources.SimResourceContext
public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
private var iterator: Iterator<Fragment>? = null
- override fun onStart(ctx: SimResourceContext) {
- check(iterator == null) { "Consumer already running" }
- iterator = trace.iterator()
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
@@ -57,8 +53,17 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
}
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- iterator = null
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ check(iterator == null) { "Consumer already running" }
+ iterator = trace.iterator()
+ }
+ SimResourceEvent.Exit -> {
+ iterator = null
+ }
+ else -> {}
+ }
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index e272abb8..2b32300e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceAggregatorMaxMin] class.
@@ -42,19 +41,19 @@ import org.opendc.utils.TimerScheduler
internal class SimResourceAggregatorMaxMinTest {
@Test
fun testSingleCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val forwarder = SimResourceForwarder()
val sources = listOf(
forwarder,
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
val consumer = SimWorkConsumer(1.0, 0.5)
val usage = mutableListOf<Double>()
- val source = SimResourceSource(1.0, clock, scheduler)
+ val source = SimResourceSource(1.0, scheduler)
val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
source.startConsumer(adapter)
@@ -73,12 +72,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testDoubleCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -100,12 +99,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testOvercommit() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -127,19 +126,19 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testException() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
.returns(SimResourceCommand.Consume(1.0, 1.0))
- .andThenThrows(IllegalStateException())
+ .andThenThrows(IllegalStateException("Test Exception"))
try {
assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }
@@ -152,12 +151,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -177,12 +176,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testFailOverCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index be909556..2e2d6588 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -34,26 +34,28 @@ import org.opendc.simulator.core.runBlockingSimulation
class SimResourceContextTest {
@Test
fun testFlushWithoutCommand() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
}
- context.flush()
+ context.flush(isIntermediate = false)
}
@Test
fun testIntermediateFlush() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
})
@@ -66,12 +68,13 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlushIdle() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
})
@@ -83,22 +86,62 @@ class SimResourceContextTest {
assertAll(
{ verify(exactly = 2) { context.onIdle(any()) } },
- { verify(exactly = 1) { context.onFinish(null) } }
+ { verify(exactly = 1) { context.onFinish() } }
)
}
@Test
fun testDoubleStart() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
}
context.start()
assertThrows<IllegalStateException> { context.start() }
}
+
+ @Test
+ fun testIdempodentCapacityChange() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish() {}
+ }
+
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
+ }
+
+ @Test
+ fun testFailureNoInfiniteLoop() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+ every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent")
+ every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure")
+
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish() {}
+ }
+
+ context.start()
+
+ delay(1)
+
+ verify(exactly = 1) { consumer.onFailure(any(), any()) }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 39f74481..5e86088d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimResourceSource] class.
@@ -41,9 +40,9 @@ import org.opendc.utils.TimerScheduler
class SimResourceSourceTest {
@Test
fun testSpeed() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -58,15 +57,14 @@ class SimResourceSourceTest {
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -77,18 +75,17 @@ class SimResourceSourceTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testSpeedLimit() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -103,7 +100,6 @@ class SimResourceSourceTest {
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
- scheduler.close()
provider.close()
}
}
@@ -114,39 +110,42 @@ class SimResourceSourceTest {
*/
@Test
fun testIntermediateInterrupt() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = object : SimResourceConsumer {
- override fun onStart(ctx: SimResourceContext) {
- ctx.interrupt()
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
return SimResourceCommand.Exit
}
+
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ ctx.interrupt()
+ }
}
try {
provider.consume(consumer)
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testInterrupt() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
lateinit var resCtx: SimResourceContext
val consumer = object : SimResourceConsumer {
var isFirst = true
- override fun onStart(ctx: SimResourceContext) {
- resCtx = ctx
+
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> resCtx = ctx
+ else -> {}
+ }
}
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
@@ -169,19 +168,18 @@ class SimResourceSourceTest {
assertEquals(0, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testFailure() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onStart(any()) }
+ every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) }
.throws(IllegalStateException())
try {
@@ -189,16 +187,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testExceptionPropagationOnNext() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -210,16 +207,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testConcurrentConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -234,16 +230,15 @@ class SimResourceSourceTest {
}
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testClosedConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -256,16 +251,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testCloseDuringConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -279,16 +273,15 @@ class SimResourceSourceTest {
assertEquals(500, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testIdle() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -300,7 +293,6 @@ class SimResourceSourceTest {
assertEquals(500, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@@ -309,9 +301,9 @@ class SimResourceSourceTest {
fun testInfiniteSleep() {
assertThrows<IllegalStateException> {
runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -321,7 +313,6 @@ class SimResourceSourceTest {
try {
provider.consume(consumer)
} finally {
- scheduler.close()
provider.close()
}
}
@@ -330,9 +321,9 @@ class SimResourceSourceTest {
@Test
fun testIncorrectDeadline() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -344,7 +335,6 @@ class SimResourceSourceTest {
assertThrows<IllegalArgumentException> { provider.consume(consumer) }
} finally {
- scheduler.close()
provider.close()
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index f7d17867..32b6d8ad 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceSwitchExclusive] class.
@@ -45,7 +44,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val speed = mutableListOf<Double>()
@@ -61,7 +60,7 @@ internal class SimResourceSwitchExclusiveTest {
)
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
val forwarder = SimResourceForwarder()
val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
source.startConsumer(adapter)
@@ -87,14 +86,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testRuntimeWorkload() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
@@ -114,14 +113,17 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTwoWorkloads() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = object : SimResourceConsumer {
var isFirst = true
- override fun onStart(ctx: SimResourceContext) {
- isFirst = true
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> isFirst = true
+ else -> {}
+ }
}
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
@@ -135,7 +137,7 @@ internal class SimResourceSwitchExclusiveTest {
}
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
@@ -156,14 +158,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 7416f277..e7dec172 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceSwitch] implementations
@@ -41,10 +40,10 @@ import org.opendc.utils.TimerScheduler
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin(clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) }
+ val sources = List(2) { SimResourceSource(2000.0, scheduler) }
sources.forEach { switch.addInput(it) }
val provider = switch.addOutput(1000.0)
@@ -57,7 +56,6 @@ internal class SimResourceSwitchMaxMinTest {
yield()
} finally {
switch.close()
- scheduler.close()
}
}
@@ -66,7 +64,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
@@ -99,16 +97,15 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(clock, listener)
+ val switch = SimResourceSwitchMaxMin(scheduler, listener)
val provider = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, scheduler))
provider.consume(workload)
yield()
} finally {
switch.close()
- scheduler.close()
}
assertAll(
@@ -124,7 +121,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
@@ -166,12 +163,12 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(clock, listener)
+ val switch = SimResourceSwitchMaxMin(scheduler, listener)
val providerA = switch.addOutput(3200.0)
val providerB = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, scheduler))
coroutineScope {
launch { providerA.consume(workloadA) }
@@ -181,7 +178,6 @@ internal class SimResourceSwitchMaxMinTest {
yield()
} finally {
switch.close()
- scheduler.close()
}
assertAll(
{ assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index d2ad73bc..880e1755 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimResourceTransformer] class.
@@ -42,8 +41,8 @@ internal class SimResourceTransformerTest {
@Test
fun testExitImmediately() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
launch {
source.consume(forwarder)
@@ -57,14 +56,13 @@ internal class SimResourceTransformerTest {
})
forwarder.close()
- scheduler.close()
}
@Test
fun testExit() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
launch {
source.consume(forwarder)
@@ -118,14 +116,14 @@ internal class SimResourceTransformerTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onFinish(any(), null) }
+ verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) }
}
@Test
fun testCancelStartedDelegate() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
@@ -136,15 +134,15 @@ internal class SimResourceTransformerTest {
yield()
forwarder.cancel()
- verify(exactly = 1) { consumer.onStart(any()) }
- verify(exactly = 1) { consumer.onFinish(any(), null) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
}
@Test
fun testCancelPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
@@ -155,15 +153,15 @@ internal class SimResourceTransformerTest {
yield()
source.cancel()
- verify(exactly = 1) { consumer.onStart(any()) }
- verify(exactly = 1) { consumer.onFinish(any(), null) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
}
@Test
fun testExitPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder(isCoupled = true)
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Exit
@@ -178,8 +176,8 @@ internal class SimResourceTransformerTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
source.startConsumer(forwarder)
@@ -191,14 +189,14 @@ internal class SimResourceTransformerTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
}
@Test
fun testTransformExit() = runBlockingSimulation {
val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
source.startConsumer(forwarder)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
index bf58b1b6..ac8b5814 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimWorkConsumer] class.
@@ -36,8 +35,8 @@ import org.opendc.utils.TimerScheduler
internal class SimWorkConsumerTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 1.0)
@@ -51,8 +50,8 @@ internal class SimWorkConsumerTest {
@Test
fun testUtilization() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 0.5)