summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt15
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt6
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt7
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt57
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt6
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt21
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt (renamed from opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt)26
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt69
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt95
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt15
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt49
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt53
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt66
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt17
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt30
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt9
25 files changed, 410 insertions, 213 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 6d87e444..68667a8c 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -105,6 +105,7 @@ public class SimHost(
* The hypervisor to run multiple workloads.
*/
public val hypervisor: SimHypervisor = hypervisor.create(
+ scope.coroutineContext, clock,
object : SimHypervisor.Listener {
override fun onSliceFinish(
hypervisor: SimHypervisor,
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index bae31921..15714aca 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -34,7 +34,8 @@ import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.utils.TimerScheduler
+import org.opendc.simulator.resources.SimResourceScheduler
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
@@ -45,13 +46,13 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: TimerScheduler<Any>
+ private lateinit var scheduler: SimResourceScheduler
private lateinit var machineModel: SimMachineModel
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = TimerScheduler(scope.coroutineContext, scope.clock)
+ scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
@@ -61,7 +62,7 @@ class SimMachineBenchmarks {
)
}
- @State(Scope.Thread)
+ @State(Scope.Benchmark)
class Workload {
lateinit var trace: Sequence<SimTraceWorkload.Fragment>
@@ -120,7 +121,7 @@ class SimMachineBenchmarks {
coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor()
+ val hypervisor = SimFairShareHypervisor(scheduler)
launch { machine.run(hypervisor) }
@@ -142,12 +143,12 @@ class SimMachineBenchmarks {
coroutineContext, clock, machineModel, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor()
+ val hypervisor = SimFairShareHypervisor(scheduler)
launch { machine.run(hypervisor) }
coroutineScope {
- repeat(2) { i ->
+ repeat(2) {
val vm = hypervisor.createMachine(machineModel)
launch {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 09ee601e..27ebba21 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.compute
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.*
import org.opendc.simulator.compute.cpufreq.ScalingDriver
import org.opendc.simulator.compute.cpufreq.ScalingGovernor
import org.opendc.simulator.compute.model.ProcessingUnit
@@ -60,7 +59,7 @@ public class SimBareMetalMachine(
/**
* The [TimerScheduler] to use for scheduling the interrupts.
*/
- private val scheduler = TimerScheduler<Any>(this.context, clock)
+ private val scheduler = SimResourceSchedulerTrampoline(this.context, clock)
override val cpus: List<SimProcessingUnit> = model.cpus.map { ProcessingUnitImpl(it) }
@@ -96,7 +95,6 @@ public class SimBareMetalMachine(
override fun close() {
super.close()
- scheduler.close()
scope.cancel()
}
@@ -107,7 +105,7 @@ public class SimBareMetalMachine(
/**
* The actual resource supporting the processing unit.
*/
- private val source = SimResourceSource(model.frequency, clock, scheduler)
+ private val source = SimResourceSource(model.frequency, scheduler)
override val speed: Double
get() = source.speed
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
index fa677de9..11aec2de 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt
@@ -31,13 +31,13 @@ import org.opendc.simulator.resources.*
*
* @param listener The hypervisor listener to use.
*/
-public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
+public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() {
override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true
override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch {
return SimResourceSwitchMaxMin(
- ctx.clock,
+ scheduler,
object : SimResourceSwitchMaxMin.Listener {
override fun onSliceFinish(
switch: SimResourceSwitchMaxMin,
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
index 02eb6ad0..2ab3ea09 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt
@@ -22,11 +22,17 @@
package org.opendc.simulator.compute
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation.
*/
public class SimFairShareHypervisorProvider : SimHypervisorProvider {
override val id: String = "fair-share"
- override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimFairShareHypervisor(listener)
+ override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
+ return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener)
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
index a5b4526b..b66020f4 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt
@@ -22,6 +22,9 @@
package org.opendc.simulator.compute
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A service provider interface for constructing a [SimHypervisor].
*/
@@ -37,5 +40,5 @@ public interface SimHypervisorProvider {
/**
* Create a [SimHypervisor] instance with the specified [listener].
*/
- public fun create(listener: SimHypervisor.Listener? = null): SimHypervisor
+ public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
index e2044d05..83b924d7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt
@@ -22,11 +22,16 @@
package org.opendc.simulator.compute
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
/**
* A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation.
*/
public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider {
override val id: String = "space-shared"
- override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor()
+ override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor {
+ return SimSpaceSharedHypervisor()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
index a067dd2e..8886caa7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt
@@ -39,6 +39,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.SimResourceSchedulerTrampoline
/**
* Test suite for the [SimHypervisor] class.
@@ -93,7 +94,7 @@ internal class SimHypervisorTest {
)
val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)))
- val hypervisor = SimFairShareHypervisor(listener)
+ val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
launch {
machine.run(hypervisor)
@@ -167,7 +168,7 @@ internal class SimHypervisorTest {
coroutineContext, clock, model, PerformanceScalingGovernor(),
SimpleScalingDriver(ConstantPowerModel(0.0))
)
- val hypervisor = SimFairShareHypervisor(listener)
+ val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener)
launch {
machine.run(hypervisor)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
index beda3eaa..cd5f33bd 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt
@@ -26,7 +26,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import org.opendc.simulator.core.SimulationCoroutineScope
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.utils.TimerScheduler
+import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.openjdk.jmh.annotations.*
import java.util.concurrent.TimeUnit
@@ -37,67 +37,76 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimResourceBenchmarks {
private lateinit var scope: SimulationCoroutineScope
- private lateinit var scheduler: TimerScheduler<Any>
+ private lateinit var scheduler: SimResourceScheduler
@Setup
fun setUp() {
scope = SimulationCoroutineScope()
- scheduler = TimerScheduler(scope.coroutineContext, scope.clock)
+ scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock)
}
@State(Scope.Thread)
class Workload {
- lateinit var consumers: Array<SimResourceConsumer>
+ lateinit var trace: Sequence<SimTraceConsumer.Fragment>
@Setup
fun setUp() {
- consumers = Array(3) { createSimpleConsumer() }
+ trace = sequenceOf(
+ SimTraceConsumer.Fragment(1000, 28.0),
+ SimTraceConsumer.Fragment(1000, 3500.0),
+ SimTraceConsumer.Fragment(1000, 0.0),
+ SimTraceConsumer.Fragment(1000, 183.0),
+ SimTraceConsumer.Fragment(1000, 400.0),
+ SimTraceConsumer.Fragment(1000, 100.0),
+ SimTraceConsumer.Fragment(1000, 3000.0),
+ SimTraceConsumer.Fragment(1000, 4500.0),
+ )
}
}
@Benchmark
fun benchmarkSource(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, clock, scheduler)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ val provider = SimResourceSource(4200.0, scheduler)
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkForwardOverhead(state: Workload) {
return scope.runBlockingSimulation {
- val provider = SimResourceSource(4200.0, clock, scheduler)
+ val provider = SimResourceSource(4200.0, scheduler)
val forwarder = SimResourceForwarder()
provider.startConsumer(forwarder)
- return@runBlockingSimulation forwarder.consume(state.consumers[0])
+ return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
val provider = switch.addOutput(3500.0)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@Benchmark
fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) {
return scope.runBlockingSimulation {
- val switch = SimResourceSwitchMaxMin(clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
repeat(3) { i ->
launch {
val provider = switch.addOutput(3500.0)
- provider.consume(state.consumers[i])
+ provider.consume(SimTraceConsumer(state.trace))
}
}
}
@@ -108,11 +117,11 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
val provider = switch.addOutput(3500.0)
- return@runBlockingSimulation provider.consume(state.consumers[0])
+ return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace))
}
}
@@ -121,13 +130,13 @@ class SimResourceBenchmarks {
return scope.runBlockingSimulation {
val switch = SimResourceSwitchExclusive()
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
- switch.addInput(SimResourceSource(3000.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
+ switch.addInput(SimResourceSource(3000.0, scheduler))
- repeat(2) { i ->
+ repeat(2) {
launch {
val provider = switch.addOutput(3500.0)
- provider.consume(state.consumers[i])
+ provider.consume(SimTraceConsumer(state.trace))
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 1bcaf45f..6ae04f27 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -22,12 +22,10 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* Abstract implementation of [SimResourceAggregator].
*/
-public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
+public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator {
/**
* This method is invoked when the resource consumer consumes resources.
*/
@@ -77,7 +75,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
protected val outputContext: SimResourceContext
get() = context
- private val context = object : SimAbstractResourceContext(0.0, clock, _output) {
+ private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) {
override val remainingWork: Double
get() {
val now = clock.millis()
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index d2f585b1..c03bfad5 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -31,9 +31,16 @@ import kotlin.math.min
*/
public abstract class SimAbstractResourceContext(
initialCapacity: Double,
- override val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val consumer: SimResourceConsumer
-) : SimResourceContext {
+) : SimResourceContext, SimResourceFlushable {
+
+ /**
+ * The clock of the context.
+ */
+ public override val clock: Clock
+ get() = scheduler.clock
+
/**
* The capacity of the resource.
*/
@@ -143,21 +150,12 @@ public abstract class SimAbstractResourceContext(
flush(isIntermediate = true)
doStop()
- } catch (cause: Throwable) {
- doFail(cause)
} finally {
isProcessing = false
}
}
- /**
- * Flush the current active resource consumption.
- *
- * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
- * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
- * will be asked to deliver a new command and is essentially interrupted.
- */
- public fun flush(isIntermediate: Boolean = false) {
+ override fun flush(isIntermediate: Boolean) {
// Flush is no-op when the consumer is finished or not yet started
if (state != SimResourceState.Active) {
return
@@ -226,7 +224,7 @@ public abstract class SimAbstractResourceContext(
return
}
- flush()
+ scheduler.schedule(this, isIntermediate = false)
}
override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]"
@@ -234,7 +232,7 @@ public abstract class SimAbstractResourceContext(
/**
* A flag to indicate that the resource is currently processing a command.
*/
- protected var isProcessing: Boolean = false
+ private var isProcessing: Boolean = false
/**
* The current command that is being processed.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 5d550ad8..5665abd1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -22,12 +22,10 @@
package org.opendc.simulator.resources
-import java.time.Clock
-
/**
* A [SimResourceAggregator] that distributes the load equally across the input resources.
*/
-public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
+public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) {
private val consumers = mutableListOf<Input>()
override fun doConsume(work: Double, limit: Double, deadline: Long) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 8128c98b..a76cb1e3 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources
-import java.time.Clock
import kotlin.math.max
import kotlin.math.min
@@ -31,7 +30,7 @@ import kotlin.math.min
*/
public class SimResourceDistributorMaxMin(
override val input: SimResourceProvider,
- private val clock: Clock,
+ private val scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceDistributor {
override val outputs: Set<SimResourceProvider>
@@ -220,7 +219,7 @@ public class SimResourceDistributorMaxMin(
}
}
- assert(deadline >= clock.millis()) { "Deadline already passed" }
+ assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" }
this.totalRequestedSpeed = totalRequestedSpeed
this.totalRequestedWork = totalRequestedWork
@@ -337,7 +336,7 @@ public class SimResourceDistributorMaxMin(
private inner class OutputContext(
private val provider: OutputProvider,
consumer: SimResourceConsumer
- ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> {
+ ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> {
/**
* The current command that is processed by the vCPU.
*/
@@ -402,6 +401,8 @@ public class SimResourceDistributorMaxMin(
}
}
+ private var isProcessing: Boolean = false
+
override fun interrupt() {
// Prevent users from interrupting the CPU while it is constructing its next command, this will only lead
// to infinite recursion.
@@ -409,10 +410,16 @@ public class SimResourceDistributorMaxMin(
return
}
- super.interrupt()
+ try {
+ isProcessing = false
- // Force the scheduler to re-schedule
- schedule()
+ super.interrupt()
+
+ // Force the scheduler to re-schedule
+ schedule()
+ } finally {
+ isProcessing = true
+ }
}
override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed)
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
index 8d2587b1..f6a1a42e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt
@@ -22,22 +22,16 @@
package org.opendc.simulator.resources
-import org.opendc.simulator.resources.consumer.SimTraceConsumer
-
/**
- * Helper function to create simple consumer workload.
+ * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer.
*/
-fun createSimpleConsumer(): SimResourceConsumer {
- return SimTraceConsumer(
- sequenceOf(
- SimTraceConsumer.Fragment(1000, 28.0),
- SimTraceConsumer.Fragment(1000, 3500.0),
- SimTraceConsumer.Fragment(1000, 0.0),
- SimTraceConsumer.Fragment(1000, 183.0),
- SimTraceConsumer.Fragment(1000, 400.0),
- SimTraceConsumer.Fragment(1000, 100.0),
- SimTraceConsumer.Fragment(1000, 3000.0),
- SimTraceConsumer.Fragment(1000, 4500.0),
- ),
- )
+public interface SimResourceFlushable {
+ /**
+ * Flush the current active resource consumption.
+ *
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun flush(isIntermediate: Boolean)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
new file mode 100644
index 00000000..a228c47b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import java.time.Clock
+
+/**
+ * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource
+ * providers and consumers.
+ *
+ * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more
+ * efficiently, reducing the overall work needed per update.
+ */
+public interface SimResourceScheduler {
+ /**
+ * The [Clock] associated with this scheduler.
+ */
+ public val clock: Clock
+
+ /**
+ * Schedule a direct interrupt for the resource context represented by [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false)
+
+ /**
+ * Schedule an interrupt in the future for the resource context represented by [flushable].
+ *
+ * This method will override earlier calls to this method for the same [flushable].
+ *
+ * @param flushable The resource context that needs to be flushed.
+ * @param timestamp The timestamp when the interrupt should happen.
+ * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be
+ * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer
+ * will be asked to deliver a new command and is essentially interrupted.
+ */
+ public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false)
+
+ /**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+ public fun batch(block: () -> Unit)
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
new file mode 100644
index 00000000..cdbb4a6c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+import org.opendc.utils.TimerScheduler
+import java.time.Clock
+import java.util.ArrayDeque
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after.
+ *
+ * @param clock The virtual simulation clock.
+ */
+public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler {
+ /**
+ * The [TimerScheduler] to actually schedule the interrupts.
+ */
+ private val timers = TimerScheduler<Any>(context, clock)
+
+ /**
+ * A flag to indicate that an interrupt is currently running already.
+ */
+ private var isRunning: Boolean = false
+
+ /**
+ * The queue of resources to be flushed.
+ */
+ private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>()
+
+ override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) {
+ queue.add(flushable to isIntermediate)
+
+ if (isRunning) {
+ return
+ }
+
+ flush()
+ }
+
+ override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) {
+ timers.startSingleTimerTo(flushable, timestamp) {
+ schedule(flushable, isIntermediate)
+ }
+ }
+
+ override fun batch(block: () -> Unit) {
+ val wasAlreadyRunning = isRunning
+ try {
+ isRunning = true
+ block()
+ } finally {
+ if (!wasAlreadyRunning) {
+ isRunning = false
+ }
+ }
+ }
+
+ /**
+ * Flush the scheduled queue.
+ */
+ private fun flush() {
+ val visited = mutableSetOf<SimResourceFlushable>()
+ try {
+ isRunning = true
+ while (queue.isNotEmpty()) {
+ val (flushable, isIntermediate) = queue.poll()
+ flushable.flush(isIntermediate)
+ visited.add(flushable)
+ }
+ } finally {
+ isRunning = false
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index fe569096..3277b889 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import org.opendc.utils.TimerScheduler
-import java.time.Clock
import kotlin.math.ceil
import kotlin.math.min
@@ -31,13 +29,11 @@ import kotlin.math.min
* A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity.
*
* @param initialCapacity The initial capacity of the resource.
- * @param clock The virtual clock to track simulation time.
* @param scheduler The scheduler to schedule the interrupts.
*/
public class SimResourceSource(
initialCapacity: Double,
- private val clock: Clock,
- private val scheduler: TimerScheduler<Any>
+ private val scheduler: SimResourceScheduler
) : SimResourceProvider {
/**
* The current processing speed of the resource.
@@ -96,22 +92,21 @@ public class SimResourceSource(
/**
* Internal implementation of [SimResourceContext] for this class.
*/
- private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
+ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) {
override fun onIdle(deadline: Long) {
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
- scheduler.startSingleTimerTo(this, deadline) { flush() }
+ scheduler.schedule(this, deadline)
}
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
val until = min(deadline, clock.millis() + getDuration(work, speed))
-
- scheduler.startSingleTimerTo(this, until, ::flush)
+ scheduler.schedule(this, until)
}
override fun onFinish() {
- scheduler.cancel(this)
+ cancel()
ctx = null
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
index c796c251..5dc1e68d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt
@@ -23,14 +23,13 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.*
-import java.time.Clock
/**
* A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min
* fair sharing.
*/
public class SimResourceSwitchMaxMin(
- clock: Clock,
+ scheduler: SimResourceScheduler,
private val listener: Listener? = null
) : SimResourceSwitch {
private val _outputs = mutableSetOf<SimResourceProvider>()
@@ -49,13 +48,13 @@ public class SimResourceSwitchMaxMin(
/**
* The aggregator to aggregate the resources.
*/
- private val aggregator = SimResourceAggregatorMaxMin(clock)
+ private val aggregator = SimResourceAggregatorMaxMin(scheduler)
/**
* The distributor to distribute the aggregated resources.
*/
private val distributor = SimResourceDistributorMaxMin(
- aggregator.output, clock,
+ aggregator.output, scheduler,
object : SimResourceDistributorMaxMin.Listener {
override fun onSliceFinish(
switch: SimResourceDistributor,
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index e78bcdac..2b32300e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceAggregatorMaxMin] class.
@@ -42,19 +41,19 @@ import org.opendc.utils.TimerScheduler
internal class SimResourceAggregatorMaxMinTest {
@Test
fun testSingleCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val forwarder = SimResourceForwarder()
val sources = listOf(
forwarder,
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
val consumer = SimWorkConsumer(1.0, 0.5)
val usage = mutableListOf<Double>()
- val source = SimResourceSource(1.0, clock, scheduler)
+ val source = SimResourceSource(1.0, scheduler)
val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
source.startConsumer(adapter)
@@ -73,12 +72,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testDoubleCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -100,12 +99,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testOvercommit() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -127,12 +126,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testException() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -152,12 +151,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
@@ -177,12 +176,12 @@ internal class SimResourceAggregatorMaxMinTest {
@Test
fun testFailOverCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
- val aggregator = SimResourceAggregatorMaxMin(clock)
+ val aggregator = SimResourceAggregatorMaxMin(scheduler)
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
- SimResourceSource(1.0, clock, scheduler)
+ SimResourceSource(1.0, scheduler),
+ SimResourceSource(1.0, scheduler)
)
sources.forEach(aggregator::addInput)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index 8c15ec71..2e2d6588 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -34,24 +34,26 @@ import org.opendc.simulator.core.runBlockingSimulation
class SimResourceContextTest {
@Test
fun testFlushWithoutCommand() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
override fun onFinish() {}
}
- context.flush()
+ context.flush(isIntermediate = false)
}
@Test
fun testIntermediateFlush() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -66,10 +68,11 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlushIdle() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -89,10 +92,11 @@ class SimResourceContextTest {
@Test
fun testDoubleStart() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit
- val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
override fun onIdle(deadline: Long) {}
override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
@@ -101,4 +105,43 @@ class SimResourceContextTest {
context.start()
assertThrows<IllegalStateException> { context.start() }
}
+
+ @Test
+ fun testIdempodentCapacityChange() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit
+
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish() {}
+ }
+
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
+ }
+
+ @Test
+ fun testFailureNoInfiniteLoop() = runBlockingSimulation {
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
+ every { consumer.onNext(any()) } returns SimResourceCommand.Exit
+ every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent")
+ every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure")
+
+ val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) {
+ override fun onIdle(deadline: Long) {}
+ override fun onConsume(work: Double, limit: Double, deadline: Long) {}
+ override fun onFinish() {}
+ }
+
+ context.start()
+
+ delay(1)
+
+ verify(exactly = 1) { consumer.onFailure(any(), any()) }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 361a1516..5e86088d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimResourceSource] class.
@@ -41,9 +40,9 @@ import org.opendc.utils.TimerScheduler
class SimResourceSourceTest {
@Test
fun testSpeed() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -58,15 +57,14 @@ class SimResourceSourceTest {
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testAdjustCapacity() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
@@ -79,16 +77,15 @@ class SimResourceSourceTest {
assertEquals(3000, clock.millis())
verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testSpeedLimit() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -103,7 +100,6 @@ class SimResourceSourceTest {
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
- scheduler.close()
provider.close()
}
}
@@ -114,9 +110,9 @@ class SimResourceSourceTest {
*/
@Test
fun testIntermediateInterrupt() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = object : SimResourceConsumer {
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
@@ -131,16 +127,15 @@ class SimResourceSourceTest {
try {
provider.consume(consumer)
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testInterrupt() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
lateinit var resCtx: SimResourceContext
val consumer = object : SimResourceConsumer {
@@ -173,16 +168,15 @@ class SimResourceSourceTest {
assertEquals(0, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testFailure() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) }
@@ -193,16 +187,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testExceptionPropagationOnNext() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -214,16 +207,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testConcurrentConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -238,16 +230,15 @@ class SimResourceSourceTest {
}
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testClosedConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -260,16 +251,15 @@ class SimResourceSourceTest {
provider.consume(consumer)
}
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testCloseDuringConsumption() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -283,16 +273,15 @@ class SimResourceSourceTest {
assertEquals(500, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@Test
fun testIdle() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -304,7 +293,6 @@ class SimResourceSourceTest {
assertEquals(500, clock.millis())
} finally {
- scheduler.close()
provider.close()
}
}
@@ -313,9 +301,9 @@ class SimResourceSourceTest {
fun testInfiniteSleep() {
assertThrows<IllegalStateException> {
runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -325,7 +313,6 @@ class SimResourceSourceTest {
try {
provider.consume(consumer)
} finally {
- scheduler.close()
provider.close()
}
}
@@ -334,9 +321,9 @@ class SimResourceSourceTest {
@Test
fun testIncorrectDeadline() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val capacity = 4200.0
- val provider = SimResourceSource(capacity, clock, scheduler)
+ val provider = SimResourceSource(capacity, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
@@ -348,7 +335,6 @@ class SimResourceSourceTest {
assertThrows<IllegalArgumentException> { provider.consume(consumer) }
} finally {
- scheduler.close()
provider.close()
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index 1b1f7790..32b6d8ad 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceSwitchExclusive] class.
@@ -45,7 +44,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val speed = mutableListOf<Double>()
@@ -61,7 +60,7 @@ internal class SimResourceSwitchExclusiveTest {
)
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
val forwarder = SimResourceForwarder()
val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
source.startConsumer(adapter)
@@ -87,14 +86,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testRuntimeWorkload() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
@@ -114,7 +113,7 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testTwoWorkloads() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = object : SimResourceConsumer {
@@ -138,7 +137,7 @@ internal class SimResourceSwitchExclusiveTest {
}
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
@@ -159,14 +158,14 @@ internal class SimResourceSwitchExclusiveTest {
*/
@Test
fun testConcurrentWorkloadFails() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val duration = 5 * 60L * 1000
val workload = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit
val switch = SimResourceSwitchExclusive()
- val source = SimResourceSource(3200.0, clock, scheduler)
+ val source = SimResourceSource(3200.0, scheduler)
switch.addInput(source)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
index 7416f277..e7dec172 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimTraceConsumer
-import org.opendc.utils.TimerScheduler
/**
* Test suite for the [SimResourceSwitch] implementations
@@ -41,10 +40,10 @@ import org.opendc.utils.TimerScheduler
internal class SimResourceSwitchMaxMinTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val switch = SimResourceSwitchMaxMin(clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val switch = SimResourceSwitchMaxMin(scheduler)
- val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) }
+ val sources = List(2) { SimResourceSource(2000.0, scheduler) }
sources.forEach { switch.addInput(it) }
val provider = switch.addOutput(1000.0)
@@ -57,7 +56,6 @@ internal class SimResourceSwitchMaxMinTest {
yield()
} finally {
switch.close()
- scheduler.close()
}
}
@@ -66,7 +64,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedSingle() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
@@ -99,16 +97,15 @@ internal class SimResourceSwitchMaxMinTest {
),
)
- val switch = SimResourceSwitchMaxMin(clock, listener)
+ val switch = SimResourceSwitchMaxMin(scheduler, listener)
val provider = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, scheduler))
provider.consume(workload)
yield()
} finally {
switch.close()
- scheduler.close()
}
assertAll(
@@ -124,7 +121,7 @@ internal class SimResourceSwitchMaxMinTest {
*/
@Test
fun testOvercommittedDual() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
val listener = object : SimResourceSwitchMaxMin.Listener {
var totalRequestedWork = 0L
@@ -166,12 +163,12 @@ internal class SimResourceSwitchMaxMinTest {
)
)
- val switch = SimResourceSwitchMaxMin(clock, listener)
+ val switch = SimResourceSwitchMaxMin(scheduler, listener)
val providerA = switch.addOutput(3200.0)
val providerB = switch.addOutput(3200.0)
try {
- switch.addInput(SimResourceSource(3200.0, clock, scheduler))
+ switch.addInput(SimResourceSource(3200.0, scheduler))
coroutineScope {
launch { providerA.consume(workloadA) }
@@ -181,7 +178,6 @@ internal class SimResourceSwitchMaxMinTest {
yield()
} finally {
switch.close()
- scheduler.close()
}
assertAll(
{ assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") },
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index e3ca5845..880e1755 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimResourceTransformer] class.
@@ -42,8 +41,8 @@ internal class SimResourceTransformerTest {
@Test
fun testExitImmediately() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
launch {
source.consume(forwarder)
@@ -57,14 +56,13 @@ internal class SimResourceTransformerTest {
})
forwarder.close()
- scheduler.close()
}
@Test
fun testExit() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
launch {
source.consume(forwarder)
@@ -124,8 +122,8 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelStartedDelegate() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
@@ -143,8 +141,8 @@ internal class SimResourceTransformerTest {
@Test
fun testCancelPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10)
@@ -162,8 +160,8 @@ internal class SimResourceTransformerTest {
@Test
fun testExitPropagation() = runBlockingSimulation {
val forwarder = SimResourceForwarder(isCoupled = true)
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(2000.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(2000.0, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) } returns SimResourceCommand.Exit
@@ -178,8 +176,8 @@ internal class SimResourceTransformerTest {
@Test
fun testAdjustCapacity() = runBlockingSimulation {
val forwarder = SimResourceForwarder()
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
source.startConsumer(forwarder)
@@ -197,8 +195,8 @@ internal class SimResourceTransformerTest {
@Test
fun testTransformExit() = runBlockingSimulation {
val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit }
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val source = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val source = SimResourceSource(1.0, scheduler)
val consumer = spyk(SimWorkConsumer(2.0, 1.0))
source.startConsumer(forwarder)
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
index bf58b1b6..ac8b5814 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.consumer.SimWorkConsumer
-import org.opendc.utils.TimerScheduler
/**
* A test suite for the [SimWorkConsumer] class.
@@ -36,8 +35,8 @@ import org.opendc.utils.TimerScheduler
internal class SimWorkConsumerTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 1.0)
@@ -51,8 +50,8 @@ internal class SimWorkConsumerTest {
@Test
fun testUtilization() = runBlockingSimulation {
- val scheduler = TimerScheduler<Any>(coroutineContext, clock)
- val provider = SimResourceSource(1.0, clock, scheduler)
+ val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock)
+ val provider = SimResourceSource(1.0, scheduler)
val consumer = SimWorkConsumer(1.0, 0.5)