From 4e9f72b50473d73f9ca9e30a7fbeb97a8a1c0555 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 7 Apr 2021 16:26:00 +0200 Subject: simulator: Move away from StateFlow for low-level monitoring This change removes the StateFlow speed property on the SimResourceSource, as the overhead of emitting changes to the StateFlow is too high in a single-thread context. Our new approach is to use direct callbacks and counters. --- .../resources/SimAbstractResourceAggregator.kt | 13 +++++- .../resources/SimAbstractResourceContext.kt | 2 + .../simulator/resources/SimResourceConsumer.kt | 10 ++++- .../resources/SimResourceDistributorMaxMin.kt | 4 +- .../simulator/resources/SimResourceSource.kt | 14 ++----- .../simulator/resources/SimResourceTransformer.kt | 4 ++ .../resources/consumer/SimSpeedConsumerAdapter.kt | 47 ++++++++++++++-------- .../resources/SimResourceAggregatorMaxMinTest.kt | 17 ++++---- .../simulator/resources/SimResourceSourceTest.kt | 12 +++--- .../resources/SimResourceSwitchExclusiveTest.kt | 11 +++-- 10 files changed, 81 insertions(+), 53 deletions(-) (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src') diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index e5991264..c7fa6a17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -102,7 +102,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double - get() = inputContexts.sumByDouble { it.remainingWork } + get() { + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE override fun interrupt() { super.interrupt() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 5c5ee038..05ed0714 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -272,6 +272,7 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = 0.0 + consumer.onConfirm(this, 0.0) onIdle(deadline) } @@ -283,6 +284,7 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = min(capacity, limit) + consumer.onConfirm(this, speed) onConsume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 672a3e9d..38672b13 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A [SimResourceConsumer] characterizes how a [SimResource] is consumed. + * A [SimResourceConsumer] characterizes how a resource is consumed. * * Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently) * for multiple resource providers, unless explicitly said otherwise. @@ -45,6 +45,14 @@ public interface SimResourceConsumer { */ public fun onNext(ctx: SimResourceContext): SimResourceCommand + /** + * This method is invoked when the resource provider confirms that the consumer is running at the given speed. + * + * @param ctx The execution context in which the consumer runs. + * @param speed The speed at which the consumer runs. + */ + public fun onConfirm(ctx: SimResourceContext, speed: Double) {} + /** * This is method is invoked when the capacity of the resource changes. * diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 9df333e3..dfdd2c2e 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -106,7 +106,7 @@ public class SimResourceDistributorMaxMin( val output = iterator.next() // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call tou output.close() + // during the call to output.close() iterator.remove() output.close() @@ -251,8 +251,8 @@ public class SimResourceDistributorMaxMin( totalAllocatedWork - totalRemainingWork, totalOvercommittedWork.toLong(), totalInterferedWork.toLong(), - totalRequestedSpeed, totalAllocatedSpeed, + totalRequestedSpeed ) totalInterferedWork = 0.0 diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 9b10edaf..025b0406 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock import kotlin.math.ceil @@ -42,11 +40,10 @@ public class SimResourceSource( private val scheduler: TimerScheduler ) : SimResourceProvider { /** - * The resource processing speed over time. + * The current processing speed of the resource. */ - public val speed: StateFlow - get() = _speed - private val _speed = MutableStateFlow(0.0) + public val speed: Double + get() = ctx?.speed ?: 0.0 /** * The capacity of the resource. @@ -101,8 +98,6 @@ public class SimResourceSource( */ private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - _speed.value = speed - // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { scheduler.startSingleTimerTo(this, deadline) { flush() } @@ -110,15 +105,12 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - _speed.value = speed - val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - _speed.value = speed scheduler.cancel(this) cancel() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 73f18c7c..de455021 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -124,6 +124,10 @@ public class SimResourceTransformer( } } + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate?.onConfirm(ctx, speed) + } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { delegate?.onCapacityChanged(ctx, isThrottled) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index fd4a9ed5..114c7312 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources.consumer -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext @@ -32,37 +30,52 @@ import kotlin.math.min /** * Helper class to expose an observable [speed] field describing the speed of the consumer. */ -public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate { +public class SimSpeedConsumerAdapter( + private val delegate: SimResourceConsumer, + private val callback: (Double) -> Unit = {} +) : SimResourceConsumer by delegate { /** - * The resource processing speed over time. + * The resource processing speed at this instant. */ - public val speed: StateFlow - get() = _speed - private val _speed = MutableStateFlow(0.0) + public var speed: Double = 0.0 + private set(value) { + if (field != value) { + callback(value) + field = value + } + } + + init { + callback(0.0) + } override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val command = delegate.onNext(ctx) + return delegate.onNext(ctx) + } - when (command) { - is SimResourceCommand.Idle -> _speed.value = 0.0 - is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit) - is SimResourceCommand.Exit -> _speed.value = 0.0 - } + override fun onConfirm(ctx: SimResourceContext, speed: Double) { + delegate.onConfirm(ctx, speed) - return command + this.speed = speed } override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - val oldSpeed = _speed.value + val oldSpeed = speed delegate.onCapacityChanged(ctx, isThrottled) // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might // need to update the current speed. - if (oldSpeed == _speed.value) { - _speed.value = min(ctx.capacity, _speed.value) + if (oldSpeed == speed) { + speed = min(ctx.capacity, speed) } } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + super.onFinish(ctx, cause) + + speed = 0.0 + } + override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index de864c1c..bf8c6d1f 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -26,12 +26,12 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -47,15 +47,18 @@ internal class SimResourceAggregatorMaxMinTest { val scheduler = TimerScheduler(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(clock) + val forwarder = SimResourceForwarder() val sources = listOf( - SimResourceSource(1.0, clock, scheduler), + forwarder, SimResourceSource(1.0, clock, scheduler) ) sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) val usage = mutableListOf() - val job = launch { sources[0].speed.toList(usage) } + val source = SimResourceSource(1.0, clock, scheduler) + val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) + source.startConsumer(adapter) try { aggregator.output.consume(consumer) @@ -67,7 +70,6 @@ internal class SimResourceAggregatorMaxMinTest { ) } finally { aggregator.output.close() - job.cancel() } } @@ -85,18 +87,17 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(2.0, 1.0) val usage = mutableListOf() - val job = launch { sources[0].speed.toList(usage) } + val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(consumer) + aggregator.output.consume(adapter) yield() assertAll( { assertEquals(1000, currentTime) }, - { assertEquals(listOf(0.0, 1.0, 0.0), usage) } + { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { aggregator.output.close() - job.cancel() } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 58e19421..dbba6160 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -27,10 +27,10 @@ import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -54,11 +54,10 @@ class SimResourceSourceTest { try { val res = mutableListOf() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() @@ -102,11 +101,10 @@ class SimResourceSourceTest { try { val res = mutableListOf() - val job = launch { provider.speed.toList(res) } + val adapter = SimSpeedConsumerAdapter(consumer, res::add) - provider.consume(consumer) + provider.consume(adapter) - job.cancel() assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { scheduler.close() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index edd60502..9a40edc4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.resources import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.yield import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.utils.TimerScheduler @@ -65,17 +64,17 @@ internal class SimResourceSwitchExclusiveTest { val switch = SimResourceSwitchExclusive() val source = SimResourceSource(3200.0, clock, scheduler) - - switch.addInput(source) + val forwarder = SimResourceForwarder() + val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addInput(forwarder) val provider = switch.addOutput(3200.0) - val job = launch { source.speed.toList(speed) } try { provider.consume(workload) yield() } finally { - job.cancel() provider.close() } -- cgit v1.2.3