summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-23 17:15:25 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-05-03 22:23:33 +0200
commit80335a49513f3e74228aa1bfb998dd54855f68e2 (patch)
tree3bfdc735ef39353c6c715399c2d9890ff423d4c4
parentb5d6aa7f384ea9d6a1a40965e883ac6403c302fd (diff)
simulator: Introduce SimResourceScheduler
This change introduces the SimResourceScheduler interface, which is a generic interface for scheduling the coordination and synchronization between resource providers and resource consumers. This interface replaces the need for users to manually specify the clock and coroutine context per resource provider.
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt15
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt57
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt21
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt (renamed from opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt)26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt15
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt49
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt53
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt66
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt17
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt30
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt9
25 files changed, 410 insertions, 213 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 6d87e444..68667a8c 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -105,6 +105,7 @@ public class SimHost(
* The hypervisor to run multiple workloads.
*/
public val hypervisor: SimHypervisor = hypervisor.create(
+ scope.coroutineContext, clock,
object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index bae31921..15714aca 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -34,7 +34,8 @@ import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.utils.TimerScheduler
+import org.opendc.simulator.resources.SimResourceScheduler
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
@@ -45,13 +46,13 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: TimerScheduler<Any>
+ private lateinit var scheduler: SimResourceScheduler
private lateinit var machineModel: SimMachineModel
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = TimerScheduler(scope.coroutineContext, scope.clock)
+ scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
@@ -61,7 +62,7 @@ class SimMachineBenchmarks {
)
}
- @State(Scope.Thread)
+ @State(Scope.Benchmark)
class Workload {
lateinit var trace: Sequence<SimTraceWorkload.Fragment>
@@ -120,7 +121,7 @@ class SimMachineBenchmarks {
coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor()
+ val hypervisor = SimFairShareHypervisor(scheduler)
launch { machine.run(hypervisor) }
@@ -142,12 +143,12 @@ class SimMachineBenchmarks {
coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor()
+ val hypervisor = SimFairShareHypervisor(scheduler)
launch { machine.run(hypervisor) }
coroutineScope {
- repeat(2) { i ->
+ repeat(2) {
val vm = hypervisor.createMachine(machineModel)
launch {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 09ee601e..27ebba21 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
import org.opendc.simulator.compute.cpufreq.ScalingDriver
import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.ProcessingUnit
@@ -60,7 +59,7 @@ public class SimBareMetalMachine(
/**
* The [TimerScheduler] to use for scheduling the interrupts.
*/
- private val scheduler = TimerScheduler<Any>(this.context, clock)
+ private val scheduler = SimResourceSchedulerTrampoline(this.context, clock)
override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
@@ -96,7 +95,6 @@ public class SimBareMetalMachine(
override fun close() {
super.close()
- scheduler.close()
scope.cancel()
}
@@ -107,7 +105,7 @@ public class SimBareMetalMachine(
/**
* The actual resource supporting the processing unit.
*/
- private val source = SimResourceSource(model.frequency, clock, scheduler)
+ private val source = SimResourceSource(model.frequency, scheduler)
override val speed: Double
get() = source.speed
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index fa677de9..11aec2de 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -31,13 +31,13 @@ import org.opendc.simulator.resources.*
*
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
+public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
return SimResourceSwitchMaxMin(
- ctx.clock,
+ scheduler,
object : SimResourceSwitchMaxMin.Listener {
override fun onSliceFinish(
switch: SimResourceSwitchMaxMin,
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 02eb6ad0..2ab3ea09 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -22,11 +22,17 @@
package org.opendc.simulator.compute
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
*/
public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
- override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimFairShareHypervisor(listener)
+ override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
+ return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener)
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
index a5b4526b..b66020f4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -22,6 +22,9 @@
package org.opendc.simulator.compute
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A service provider interface for constructing a [SimHypervisor].
*/
@@ -37,5 +40,5 @@ public interface SimHypervisorProvider {
/**
* Create a [SimHypervisor] instance with the specified [listener].
*/
- public fun create(listener: SimHypervisor.Listener? = null): SimHypervisor
+ public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index e2044d05..83b924d7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -22,11 +22,16 @@
package org.opendc.simulator.compute
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
*/
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor()
+ override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
+ return SimSpaceSharedHypervisor()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index a067dd2e..8886caa7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -39,6 +39,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
/**
* Test suite for the [SimHypervisor] class.
@@ -93,7 +94,7 @@ internal class SimHypervisorTest {
)
val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(listener)
+ val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
launch {
machine.run(hypervisor)
@@ -167,7 +168,7 @@ internal class SimHypervisorTest {
coroutineContext, clock, model, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(listener)
+ val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
launch {
machine.run(hypervisor)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index beda3eaa..cd5f33bd 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -26,7 +26,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.utils.TimerScheduler
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
@@ -37,67 +37,76 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: TimerScheduler<Any>
+ private lateinit var scheduler: SimResourceScheduler
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = TimerScheduler(scope.coroutineContext, scope.clock)
+ scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
}
@State(Scope.Thread)
class Workload {
- lateinit var consumers: Array<SimResourceConsumer>
+ lateinit var trace: Sequence<SimTraceConsumer.Fragment>
@Setup
fun setUp() {
- consumers = Array(3) { createSimpleConsumer() }
+ trace = sequenceOf(
+ SimTraceConsumer.Fragment(1000, 28.0),
+ SimTraceConsumer.Fragment(1000, 3500.0),
+ SimTraceConsumer.Fragment(1000, 0.0),
+ SimTraceConsumer.Fragment(1000, 183.0),
+ SimTraceConsumer.Fragment(1000, 400.0),
+ SimTraceConsumer.Fragment(1000, 100.0),
+ SimTraceConsumer.Fragment(1000, 3000.0),
+ SimTraceConsumer.Fragment(1000, 4500.0),
+ )
}
}
@Benchmark
fun benchmarkSource(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, clock, scheduler)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ val provider = SimResourceSource(4200.0, scheduler)
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, clock, scheduler)
+ val provider = SimResourceSource(4200.0, scheduler)
val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
- return@runBlockingSimulation forwarder.consume(state.consumers[0])
+ return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
val provider = switch.addOutput(3500.0)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
repeat(3) { i ->
launch {
val provider = switch.addOutput(3500.0)
- provider.consume(state.consumers[i])
+ provider.consume(SimTraceConsumer(state.trace))
}
}
}
@@ -108,11 +117,11 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
val provider = switch.addOutput(3500.0)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -121,13 +130,13 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
- repeat(2) { i ->
+ repeat(2) {
launch {
val provider = switch.addOutput(3500.0)
- provider.consume(state.consumers[i])
+ provider.consume(SimTraceConsumer(state.trace))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 1bcaf45f..6ae04f27 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -22,12 +22,10 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* Abstract implementation of [SimResourceAggregator].
*/
-public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
+public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator {
/**
* This method is invoked when the resource consumer consumes resources.
*/
@@ -77,7 +75,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
protected val outputContext: SimResourceContext
get() = context
- private val context = object : SimAbstractResourceContext(0.0, clock, _output) {
+ private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) {
override val remainingWork: Double
get() {
val now = clock.millis()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index d2f585b1..c03bfad5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -31,9 +31,16 @@ import kotlin.math.min
*/
public abstract class SimAbstractResourceContext(
initialCapacity: Double,
- override val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val consumer: SimResourceConsumer
-) : SimResourceContext {
+) : SimResourceContext, SimResourceFlushable {
+
+ /**
+ * The clock of the context.
+ */
+ public override val clock: Clock
+ get() = scheduler.clock
+
/**
* The capacity of the resource.
*/
@@ -143,21 +150,12 @@ public abstract class SimAbstractResourceContext(
flush(isIntermediate = true)
doStop()
- } catch (cause: Throwable) {
- doFail(cause)
} finally {
isProcessing = false
}
}
- /**
- * Flush the current active resource consumption.
- *
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun flush(isIntermediate: Boolean = false) {
+ override fun flush(isIntermediate: Boolean) {
// Flush is no-op when the consumer is finished or not yet started
if (state != SimResourceState.Active) {
return
@@ -226,7 +224,7 @@ public abstract class SimAbstractResourceContext(
return
}
- flush()
+ scheduler.schedule(this, isIntermediate = false)
}
override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
@@ -234,7 +232,7 @@ public abstract class SimAbstractResourceContext(
/**
* A flag to indicate that the resource is currently processing a command.
*/
- protected var isProcessing: Boolean = false
+ private var isProcessing: Boolean = false
/**
* The current command that is being processed.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 5d550ad8..5665abd1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -22,12 +22,10 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* A [SimResourceAggregator] that distributes the load equally across the input resources.
*/
-public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
+public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) {
private val consumers = mutableListOf<Input>()
override fun doConsume(work: Double, limit: Double, deadline: Long) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 8128c98b..a76cb1e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources
-import java.time.Clock
import kotlin.math.max
import kotlin.math.min
@@ -31,7 +30,7 @@ import kotlin.math.min
*/
public class SimResourceDistributorMaxMin(
override val input: SimResourceProvider,
- private val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
@@ -220,7 +219,7 @@ public class SimResourceDistributorMaxMin(
}
}
- assert(deadline >= clock.millis()) { "Deadline already passed" }
+ assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" }
this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
@@ -337,7 +336,7 @@ public class SimResourceDistributorMaxMin(
private inner class OutputContext(
private val provider: OutputProvider,
consumer: SimResourceConsumer
- ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
+ ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> {
/**
* The current command that is processed by the vCPU.
*/
@@ -402,6 +401,8 @@ public class SimResourceDistributorMaxMin(
}
}
+ private var isProcessing: Boolean = false
+
override fun interrupt() {
// Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
// to infinite recursion.
@@ -409,10 +410,16 @@ public class SimResourceDistributorMaxMin(
return
}
- super.interrupt()
+ try {
+ isProcessing = false
- // Force the scheduler to re-schedule
- schedule()
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ } finally {
+ isProcessing = true
+ }
}
override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
index 8d2587b1..f6a1a42e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
@@ -22,22 +22,16 @@
package org.opendc.simulator.resources
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
-
/**
- * Helper function to create simple consumer workload.
+ * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer.
*/
-fun createSimpleConsumer(): SimResourceConsumer {
- return SimTraceConsumer(
- sequenceOf(
- SimTraceConsumer.Fragment(1000, 28.0),
- SimTraceConsumer.Fragment(1000, 3500.0),
- SimTraceConsumer.Fragment(1000, 0.0),
- SimTraceConsumer.Fragment(1000, 183.0),
- SimTraceConsumer.Fragment(1000, 400.0),
- SimTraceConsumer.Fragment(1000, 100.0),
- SimTraceConsumer.Fragment(1000, 3000.0),
- SimTraceConsumer.Fragment(1000, 4500.0),
- ),
- )
+public interface SimResourceFlushable {
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun flush(isIntermediate: Boolean)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
new file mode 100644
index 00000000..a228c47b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource
+ * providers and consumers.
+ *
+ * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more
+ * efficiently, reducing the overall work needed per update.
+ */
+public interface SimResourceScheduler {
+ /**
+ * The [Clock] associated with this scheduler.
+ */
+ public val clock: Clock
+
+ /**
+ * Schedule a direct interrupt for the resource context represented by [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false)
+
+ /**
+ * Schedule an interrupt in the future for the resource context represented by [flushable].
+ *
+ * This method will override earlier calls to this method for the same [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param timestamp The timestamp when the interrupt should happen.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false)
+
+ /**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+ public fun batch(block: () -> Unit)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
new file mode 100644
index 00000000..cdbb4a6c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import java.util.ArrayDeque
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after.
+ *
+ * @param clock The virtual simulation clock.
+ */
+public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler {
+ /**
+ * The [TimerScheduler] to actually schedule the interrupts.
+ */
+ private val timers = TimerScheduler<Any>(context, clock)
+
+ /**
+ * A flag to indicate that an interrupt is currently running already.
+ */
+ private var isRunning: Boolean = false
+
+ /**
+ * The queue of resources to be flushed.
+ */
+ private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>()
+
+ override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) {
+ queue.add(flushable to isIntermediate)
+
+ if (isRunning) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) {
+ timers.startSingleTimerTo(flushable, timestamp) {
+ schedule(flushable, isIntermediate)
+ }
+ }
+
+ override fun batch(block: () -> Unit) {
+ val wasAlreadyRunning = isRunning
+ try {
+ isRunning = true
+ block()
+ } finally {
+ if (!wasAlreadyRunning) {
+ isRunning = false
+ }
+ }
+ }
+
+ /**
+ * Flush the scheduled queue.
+ */
+ private fun flush() {
+ val visited = mutableSetOf<SimResourceFlushable>()
+ try {
+ isRunning = true
+ while (queue.isNotEmpty()) {
+ val (flushable, isIntermediate) = queue.poll()
+ flushable.flush(isIntermediate)
+ visited.add(flushable)
+ }
+ } finally {
+ isRunning = false
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index fe569096..3277b889 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
import kotlin.math.ceil
import kotlin.math.min
@@ -31,13 +29,11 @@ import kotlin.math.min
* A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param clock The virtual clock to track simulation time.
* @param scheduler The scheduler to schedule the interrupts.
*/
public class SimResourceSource(
initialCapacity: Double,
- private val clock: Clock,
- private val scheduler: TimerScheduler<Any>
+ private val scheduler: SimResourceScheduler
) : SimResourceProvider {
/**
* The current processing speed of the resource.
@@ -96,22 +92,21 @@ public class SimResourceSource(
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) {
override fun onIdle(deadline: Long) {
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
- scheduler.startSingleTimerTo(this, deadline) { flush() }
+ scheduler.schedule(this, deadline)
}
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
val until = min(deadline, clock.millis() + getDuration(work, speed))
-
- scheduler.startSingleTimerTo(this, until, ::flush)
+ scheduler.schedule(this, until)
}
override fun onFinish() {
- scheduler.cancel(this)
+ cancel()
ctx = null
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index c796c251..5dc1e68d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -23,14 +23,13 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.*
-import java.time.Clock
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
public class SimResourceSwitchMaxMin(
- clock: Clock,
+ scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceSwitch {
private val _outputs = mutableSetOf<SimResourceProvider>()
@@ -49,13 +48,13 @@ public class SimResourceSwitchMaxMin(
/**
* The aggregator to aggregate the resources.
*/
- private val aggregator = SimResourceAggregatorMaxMin(clock)
+ private val aggregator = SimResourceAggregatorMaxMin(scheduler)
/**
* The distributor to distribute the aggregated resources.
*/
private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, clock,
+ aggregator.output, scheduler,
object : SimResourceDistributorMaxMin.Listener {
override fun onSliceFinish(
switch: SimResourceDistributor,
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index e78bcdac..2b32300e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceAggregatorMaxMin] class.
@@ -42,19 +41,19 @@ import org.opendc.utils.TimerScheduler
internal class SimResourceAggregatorMaxMinTest {
@Test
fun testSingleCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val forwarder = SimResourceForwarder()
val sources = listOf(
forwarder,
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
val consumer = SimWorkConsumer(1.0, 0.5)
val usage = mutableListOf<Double>()
- val source = SimResourceSource(1.0, clock, scheduler)
+ val source = SimResourceSource(1.0, scheduler)
val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
source.startConsumer(adapter)
@@ -73,12 +72,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testDoubleCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -100,12 +99,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testOvercommit() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -127,12 +126,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testException() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -152,12 +151,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -177,12 +176,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testFailOverCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index 8c15ec71..2e2d6588 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -34,24 +34,26 @@ import org.opendc.simulator.core.runBlockingSimulation
class SimResourceContextTest {
@Test
fun testFlushWithoutCommand() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
override fun onFinish() {}
}
- context.flush()
+ context.flush(isIntermediate = false)
}
@Test
fun testIntermediateFlush() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -66,10 +68,11 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlushIdle() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -89,10 +92,11 @@ class SimResourceContextTest {
@Test
fun testDoubleStart() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -101,4 +105,43 @@ class SimResourceContextTest {
context.start()
assertThrows<IllegalStateException> { context.start() }
}
+
+ @Test
+ fun testIdempodentCapacityChange() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish() {}
+ }
+
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
+ }
+
+ @Test
+ fun testFailureNoInfiniteLoop() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+ every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent")
+ every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure")
+
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish() {}
+ }
+
+ context.start()
+
+ delay(1)
+
+ verify(exactly = 1) { consumer.onFailure(any(), any()) }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 361a1516..5e86088d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimResourceSource] class.
@@ -41,9 +40,9 @@ import org.opendc.utils.TimerScheduler
class SimResourceSourceTest {
@Test
fun testSpeed() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -58,15 +57,14 @@ class SimResourceSourceTest {
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -79,16 +77,15 @@ class SimResourceSourceTest {
assertEquals(3000, clock.millis())
verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testSpeedLimit() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -103,7 +100,6 @@ class SimResourceSourceTest {
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
- scheduler.close()
provider.close()
}
}
@@ -114,9 +110,9 @@ class SimResourceSourceTest {
*/
@Test
fun testIntermediateInterrupt() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = object : SimResourceConsumer {
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
@@ -131,16 +127,15 @@ class SimResourceSourceTest {
try {
provider.consume(consumer)
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testInterrupt() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
lateinit var resCtx: SimResourceContext
val consumer = object : SimResourceConsumer {
@@ -173,16 +168,15 @@ class SimResourceSourceTest {
assertEquals(0, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testFailure() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) }
@@ -193,16 +187,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testExceptionPropagationOnNext() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -214,16 +207,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testConcurrentConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -238,16 +230,15 @@ class SimResourceSourceTest {
}
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testClosedConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -260,16 +251,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testCloseDuringConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -283,16 +273,15 @@ class SimResourceSourceTest {
assertEquals(500, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testIdle() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -304,7 +293,6 @@ class SimResourceSourceTest {
assertEquals(500, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@@ -313,9 +301,9 @@ class SimResourceSourceTest {
fun testInfiniteSleep() {
assertThrows<IllegalStateException> {
runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -325,7 +313,6 @@ class SimResourceSourceTest {
try {
provider.consume(consumer)
} finally {
- scheduler.close()
provider.close()
}
}
@@ -334,9 +321,9 @@ class SimResourceSourceTest {
@Test
fun testIncorrectDeadline() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -348,7 +335,6 @@ class SimResourceSourceTest {
assertThrows<IllegalArgumentException> { provider.consume(consumer) }
} finally {
- scheduler.close()
provider.close()
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index 1b1f7790..32b6d8ad 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceSwitchExclusive] class.
@@ -45,7 +44,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val speed = mutableListOf<Double>()
@@ -61,7 +60,7 @@ internal class SimResourceSwitchExclusiveTest {
)
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
val forwarder = SimResourceForwarder()
val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
source.startConsumer(adapter)
@@ -87,14 +86,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testRuntimeWorkload() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
@@ -114,7 +113,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTwoWorkloads() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = object : SimResourceConsumer {
@@ -138,7 +137,7 @@ internal class SimResourceSwitchExclusiveTest {
}
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
@@ -159,14 +158,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 7416f277..e7dec172 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceSwitch] implementations
@@ -41,10 +40,10 @@ import org.opendc.utils.TimerScheduler
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin(clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) }
+ val sources = List(2) { SimResourceSource(2000.0, scheduler) }
sources.forEach { switch.addInput(it) }
val provider = switch.addOutput(1000.0)
@@ -57,7 +56,6 @@ internal class SimResourceSwitchMaxMinTest {
yield()
} finally {
switch.close()
- scheduler.close()
}
}
@@ -66,7 +64,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
@@ -99,16 +97,15 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(clock, listener)
+ val switch = SimResourceSwitchMaxMin(scheduler, listener)
val provider = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, scheduler))
provider.consume(workload)
yield()
} finally {
switch.close()
- scheduler.close()
}
assertAll(
@@ -124,7 +121,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
@@ -166,12 +163,12 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(clock, listener)
+ val switch = SimResourceSwitchMaxMin(scheduler, listener)
val providerA = switch.addOutput(3200.0)
val providerB = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, scheduler))
coroutineScope {
launch { providerA.consume(workloadA) }
@@ -181,7 +178,6 @@ internal class SimResourceSwitchMaxMinTest {
yield()
} finally {
switch.close()
- scheduler.close()
}
assertAll(
{ assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index e3ca5845..880e1755 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimResourceTransformer] class.
@@ -42,8 +41,8 @@ internal class SimResourceTransformerTest {
@Test
fun testExitImmediately() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
launch {
source.consume(forwarder)
@@ -57,14 +56,13 @@ internal class SimResourceTransformerTest {
})
forwarder.close()
- scheduler.close()
}
@Test
fun testExit() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
launch {
source.consume(forwarder)
@@ -124,8 +122,8 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelStartedDelegate() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
@@ -143,8 +141,8 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
@@ -162,8 +160,8 @@ internal class SimResourceTransformerTest {
@Test
fun testExitPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder(isCoupled = true)
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Exit
@@ -178,8 +176,8 @@ internal class SimResourceTransformerTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
source.startConsumer(forwarder)
@@ -197,8 +195,8 @@ internal class SimResourceTransformerTest {
@Test
fun testTransformExit() = runBlockingSimulation {
val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
source.startConsumer(forwarder)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
index bf58b1b6..ac8b5814 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimWorkConsumer] class.
@@ -36,8 +35,8 @@ import org.opendc.utils.TimerScheduler
internal class SimWorkConsumerTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 1.0)
@@ -51,8 +50,8 @@ internal class SimWorkConsumerTest {
@Test
fun testUtilization() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 0.5)