summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-07 21:59:13 +0200
committerGitHub <noreply@github.com>2021-04-07 21:59:13 +0200
commit5d3b759b18fb0a4278b43dea6a9db478b07804a5 (patch)
tree419dedda10f6a1f1865fbee4d1f546dd8876c940 /simulator/opendc-simulator
parent519141f9af525a853b40eb821e70ca209bc104bf (diff)
parent3d707674ddfa96ae5c090a7c918350b0bef9b50f (diff)
simulator: Optimize bottlenecks in resource layer
This pull request addresses several bottlenecks that were present in the `opendc-simulator-resources` layer and `TimerScheduler`. These changes result into a 4x performance improvement for the energy experiments we are currently doing. * The use of `StateFlow` has been removed where possible. Profiling shows that emitting changes to `StateFlow` becomes a bottleneck in a single-thread context. * `SimSpeedConsumerAdapter` is an alternative for obtaining the changes in speed of a resource. **Breaking API Changes** * `SimResourceSource` does not expose `speed` as `StateFlow` anymore. To monitor speed changes, use `SimSpeedConsumerAdapter`. * Power draw in `SimBareMetalMachine` is not exposed as `StateFlow` anymore.
Diffstat (limited to 'simulator/opendc-simulator')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt38
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt15
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt8
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt13
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt10
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt47
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt17
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt12
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt11
16 files changed, 117 insertions, 84 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 2127b066..1f26c9c9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -25,14 +25,13 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceProvider
import org.opendc.simulator.resources.SimResourceSource
import org.opendc.simulator.resources.consume
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -47,9 +46,9 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
/**
* The speed of the CPU cores.
*/
- public val speed: List<Double>
+ public val speed: DoubleArray
get() = _speed
- private var _speed = mutableListOf<Double>()
+ private var _speed = doubleArrayOf()
/**
* A flag to indicate that the machine is terminated.
@@ -94,29 +93,32 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
val ctx = Context(resources, meta)
val totalCapacity = model.cpus.sumByDouble { it.frequency }
- _speed = MutableList(model.cpus.size) { 0.0 }
+ _speed = DoubleArray(model.cpus.size) { 0.0 }
+ var totalSpeed = 0.0
workload.onStart(ctx)
for ((cpu, source) in resources) {
val consumer = workload.getConsumer(ctx, cpu)
- val job = source.speed
- .onEach {
- _speed[cpu.id] = it
- _usage.value = _speed.sum() / totalCapacity
- }
- .launchIn(this)
-
- launch {
- try {
- source.consume(consumer)
- } finally {
- job.cancel()
- }
+ val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed ->
+ val oldSpeed = _speed[cpu.id]
+ _speed[cpu.id] = newSpeed
+ totalSpeed = totalSpeed - oldSpeed + newSpeed
+
+ updateUsage(totalSpeed / totalCapacity)
}
+
+ launch { source.consume(adapter) }
}
}
+ /**
+ * This method is invoked when the usage of the machine is updated.
+ */
+ protected open fun updateUsage(usage: Double) {
+ _usage.value = usage
+ }
+
override fun close() {
if (!isTerminated) {
isTerminated = true
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 51b807d2..d5577279 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -84,12 +84,15 @@ public class SimBareMetalMachine(
/**
* The power draw of the machine.
*/
- public val powerDraw: StateFlow<Double> = usage
- .map {
- this.scalingGovernors.forEach { it.onLimit() }
- this.scalingDriver.computePower()
- }
- .stateIn(scope, SharingStarted.Eagerly, 0.0)
+ public var powerDraw: Double = 0.0
+ private set
+
+ override fun updateUsage(usage: Double) {
+ super.updateUsage(usage)
+
+ scalingGovernors.forEach { it.onLimit() }
+ powerDraw = scalingDriver.computePower()
+ }
override fun close() {
super.close()
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
index b4bbf9fb..4d62c383 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
@@ -28,7 +28,7 @@ package org.opendc.simulator.compute.cpufreq
public class DemandScalingGovernor : ScalingGovernor {
override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
override fun onLimit() {
- ctx.setTarget(ctx.resource.speed.value)
+ ctx.setTarget(ctx.resource.speed)
}
override fun toString(): String = "DemandScalingGovernor.Logic"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
index d109e4d8..1c82253c 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
@@ -59,7 +59,7 @@ public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDrive
for (ctx in contexts) {
targetFreq = max(ctx.target, targetFreq)
- totalSpeed += ctx.resource.speed.value
+ totalSpeed += ctx.resource.speed
}
val maxFreq = states.lastKey()
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
index 19c06126..c02b6285 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
@@ -35,7 +35,7 @@ internal class DemandScalingGovernorTest {
fun testSetDemandLimit() {
val ctx = mockk<ScalingContext>(relaxUnitFun = true)
- every { ctx.resource.speed.value } returns 2100.0
+ every { ctx.resource.speed } returns 2100.0
val logic = DemandScalingGovernor().createLogic(ctx)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
index 5c30bc1f..c6f233a6 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
@@ -59,7 +59,7 @@ internal class PStateScalingDriverTest {
val resource = mockk<SimResourceSource>()
every { cpu.frequency } returns 4100.0
- every { resource.speed.value } returns 1200.0
+ every { resource.speed } returns 1200.0
val driver = PStateScalingDriver(
sortedMapOf(
@@ -84,7 +84,7 @@ internal class PStateScalingDriverTest {
val resource = mockk<SimResourceSource>()
every { cpu.frequency } returns 4100.0
- every { resource.speed.value } returns 1200.0
+ every { resource.speed } returns 1200.0
val driver = PStateScalingDriver(
sortedMapOf(
@@ -125,11 +125,11 @@ internal class PStateScalingDriverTest {
val scalingContext = logic.createContext(cpu, resource)
- every { resource.speed.value } returns 1400.0
+ every { resource.speed } returns 1400.0
scalingContext.setTarget(1400.0)
assertEquals(150.0, logic.computePower())
- every { resource.speed.value } returns 1400.0
+ every { resource.speed } returns 1400.0
scalingContext.setTarget(4000.0)
assertEquals(235.0, logic.computePower())
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index e5991264..c7fa6a17 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -102,7 +102,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
override val remainingWork: Double
- get() = inputContexts.sumByDouble { it.remainingWork }
+ get() {
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
override fun interrupt() {
super.interrupt()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 5c5ee038..05ed0714 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -272,6 +272,7 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = 0.0
+ consumer.onConfirm(this, 0.0)
onIdle(deadline)
}
@@ -283,6 +284,7 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = min(capacity, limit)
+ consumer.onConfirm(this, speed)
onConsume(work, limit, deadline)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 672a3e9d..38672b13 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.resources
/**
- * A [SimResourceConsumer] characterizes how a [SimResource] is consumed.
+ * A [SimResourceConsumer] characterizes how a resource is consumed.
*
* Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
* for multiple resource providers, unless explicitly said otherwise.
@@ -46,6 +46,14 @@ public interface SimResourceConsumer {
public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
+ * This method is invoked when the resource provider confirms that the consumer is running at the given speed.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param speed The speed at which the consumer runs.
+ */
+ public fun onConfirm(ctx: SimResourceContext, speed: Double) {}
+
+ /**
* This is method is invoked when the capacity of the resource changes.
*
* After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 9df333e3..dfdd2c2e 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -106,7 +106,7 @@ public class SimResourceDistributorMaxMin(
val output = iterator.next()
// Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call tou output.close()
+ // during the call to output.close()
iterator.remove()
output.close()
@@ -251,8 +251,8 @@ public class SimResourceDistributorMaxMin(
totalAllocatedWork - totalRemainingWork,
totalOvercommittedWork.toLong(),
totalInterferedWork.toLong(),
- totalRequestedSpeed,
totalAllocatedSpeed,
+ totalRequestedSpeed
)
totalInterferedWork = 0.0
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 9b10edaf..025b0406 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
import java.time.Clock
import kotlin.math.ceil
@@ -42,11 +40,10 @@ public class SimResourceSource(
private val scheduler: TimerScheduler<Any>
) : SimResourceProvider {
/**
- * The resource processing speed over time.
+ * The current processing speed of the resource.
*/
- public val speed: StateFlow<Double>
- get() = _speed
- private val _speed = MutableStateFlow(0.0)
+ public val speed: Double
+ get() = ctx?.speed ?: 0.0
/**
* The capacity of the resource.
@@ -101,8 +98,6 @@ public class SimResourceSource(
*/
private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
override fun onIdle(deadline: Long) {
- _speed.value = speed
-
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
scheduler.startSingleTimerTo(this, deadline) { flush() }
@@ -110,15 +105,12 @@ public class SimResourceSource(
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
- _speed.value = speed
-
val until = min(deadline, clock.millis() + getDuration(work, speed))
scheduler.startSingleTimerTo(this, until, ::flush)
}
override fun onFinish(cause: Throwable?) {
- _speed.value = speed
scheduler.cancel(this)
cancel()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 73f18c7c..de455021 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -124,6 +124,10 @@ public class SimResourceTransformer(
}
}
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate?.onConfirm(ctx, speed)
+ }
+
override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
delegate?.onCapacityChanged(ctx, isThrottled)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index fd4a9ed5..114c7312 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources.consumer
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -32,37 +30,52 @@ import kotlin.math.min
/**
* Helper class to expose an observable [speed] field describing the speed of the consumer.
*/
-public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate {
+public class SimSpeedConsumerAdapter(
+ private val delegate: SimResourceConsumer,
+ private val callback: (Double) -> Unit = {}
+) : SimResourceConsumer by delegate {
/**
- * The resource processing speed over time.
+ * The resource processing speed at this instant.
*/
- public val speed: StateFlow<Double>
- get() = _speed
- private val _speed = MutableStateFlow(0.0)
+ public var speed: Double = 0.0
+ private set(value) {
+ if (field != value) {
+ callback(value)
+ field = value
+ }
+ }
+
+ init {
+ callback(0.0)
+ }
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val command = delegate.onNext(ctx)
+ return delegate.onNext(ctx)
+ }
- when (command) {
- is SimResourceCommand.Idle -> _speed.value = 0.0
- is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit)
- is SimResourceCommand.Exit -> _speed.value = 0.0
- }
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate.onConfirm(ctx, speed)
- return command
+ this.speed = speed
}
override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
- val oldSpeed = _speed.value
+ val oldSpeed = speed
delegate.onCapacityChanged(ctx, isThrottled)
// Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
// need to update the current speed.
- if (oldSpeed == _speed.value) {
- _speed.value = min(ctx.capacity, _speed.value)
+ if (oldSpeed == speed) {
+ speed = min(ctx.capacity, speed)
}
}
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ speed = 0.0
+ }
+
override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index de864c1c..bf8c6d1f 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -26,12 +26,12 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -47,15 +47,18 @@ internal class SimResourceAggregatorMaxMinTest {
val scheduler = TimerScheduler<Any>(coroutineContext, clock)
val aggregator = SimResourceAggregatorMaxMin(clock)
+ val forwarder = SimResourceForwarder()
val sources = listOf(
- SimResourceSource(1.0, clock, scheduler),
+ forwarder,
SimResourceSource(1.0, clock, scheduler)
)
sources.forEach(aggregator::addInput)
val consumer = SimWorkConsumer(1.0, 0.5)
val usage = mutableListOf<Double>()
- val job = launch { sources[0].speed.toList(usage) }
+ val source = SimResourceSource(1.0, clock, scheduler)
+ val adapter = SimSpeedConsumerAdapter(forwarder, usage::add)
+ source.startConsumer(adapter)
try {
aggregator.output.consume(consumer)
@@ -67,7 +70,6 @@ internal class SimResourceAggregatorMaxMinTest {
)
} finally {
aggregator.output.close()
- job.cancel()
}
}
@@ -85,18 +87,17 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = SimWorkConsumer(2.0, 1.0)
val usage = mutableListOf<Double>()
- val job = launch { sources[0].speed.toList(usage) }
+ val adapter = SimSpeedConsumerAdapter(consumer, usage::add)
try {
- aggregator.output.consume(consumer)
+ aggregator.output.consume(adapter)
yield()
assertAll(
{ assertEquals(1000, currentTime) },
- { assertEquals(listOf(0.0, 1.0, 0.0), usage) }
+ { assertEquals(listOf(0.0, 2.0, 0.0), usage) }
)
} finally {
aggregator.output.close()
- job.cancel()
}
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 58e19421..dbba6160 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -27,10 +27,10 @@ import io.mockk.mockk
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -54,11 +54,10 @@ class SimResourceSourceTest {
try {
val res = mutableListOf<Double>()
- val job = launch { provider.speed.toList(res) }
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
- provider.consume(consumer)
+ provider.consume(adapter)
- job.cancel()
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
scheduler.close()
@@ -102,11 +101,10 @@ class SimResourceSourceTest {
try {
val res = mutableListOf<Double>()
- val job = launch { provider.speed.toList(res) }
+ val adapter = SimSpeedConsumerAdapter(consumer, res::add)
- provider.consume(consumer)
+ provider.consume(adapter)
- job.cancel()
assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
} finally {
scheduler.close()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index edd60502..9a40edc4 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -25,14 +25,13 @@ package org.opendc.simulator.resources
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.flow.toList
-import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.yield
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import org.opendc.simulator.resources.consumer.SimTraceConsumer
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.utils.TimerScheduler
@@ -65,17 +64,17 @@ internal class SimResourceSwitchExclusiveTest {
val switch = SimResourceSwitchExclusive()
val source = SimResourceSource(3200.0, clock, scheduler)
-
- switch.addInput(source)
+ val forwarder = SimResourceForwarder()
+ val adapter = SimSpeedConsumerAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ switch.addInput(forwarder)
val provider = switch.addOutput(3200.0)
- val job = launch { source.speed.toList(speed) }
try {
provider.consume(workload)
yield()
} finally {
- job.cancel()
provider.close()
}