diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-28 22:10:12 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:37 +0200 |
| commit | d031a70f8bea02a86df7840c5ce731185df86883 (patch) | |
| tree | 0eafce9c17be7d0818702c5b66cc9e568ab08981 /opendc-simulator/opendc-simulator-resources | |
| parent | d3b0b551362eb677c12047cba82a6279ea4608b4 (diff) | |
refactor(simulator): Invoke consumer callback on every invalidation
This change updates the simulator implementation to always invoke the
`SimResourceConsumer.onNext` callback when the resource context is
invalidated. This allows users to update the resource counter or do some
other work if the context has changed.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources')
3 files changed, 35 insertions, 52 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index ad6b0108..4c0e075c 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -31,12 +31,20 @@ import org.opendc.simulator.resources.SimResourceEvent * consumption for some period of time. */ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { - private var iterator: Iterator<Fragment>? = null + private var _iterator: Iterator<Fragment>? = null + private var _nextTarget = Long.MIN_VALUE override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - val iterator = checkNotNull(iterator) + // Check whether the trace fragment was fully consumed, otherwise wait until we have done so + val nextTarget = _nextTarget + if (nextTarget > now) { + return now - nextTarget + } + + val iterator = checkNotNull(_iterator) return if (iterator.hasNext()) { val fragment = iterator.next() + _nextTarget = now + fragment.duration ctx.push(fragment.usage) fragment.duration } else { @@ -48,11 +56,11 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { when (event) { SimResourceEvent.Start -> { - check(iterator == null) { "Consumer already running" } - iterator = trace.iterator() + check(_iterator == null) { "Consumer already running" } + _iterator = trace.iterator() } SimResourceEvent.Exit -> { - iterator = null + _iterator = null } else -> {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 9cbf849d..1ac38946 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -148,6 +148,10 @@ internal class SimResourceContextImpl( } override fun push(rate: Double) { + if (_limit == rate) { + return + } + _speed = min(capacity, rate) _limit = rate } @@ -163,7 +167,7 @@ internal class SimResourceContextImpl( /** * Update the state of the resource context. */ - fun doUpdate(timestamp: Long) { + fun doUpdate(now: Long) { val oldState = _state if (oldState != SimResourceState.Active) { return @@ -171,41 +175,29 @@ internal class SimResourceContextImpl( _updateActive = true - val flag = _flag - val isInterrupted = flag and FLAG_INTERRUPT != 0 - val reachedDeadline = _deadline <= timestamp - val delta = max(0, timestamp - _timestamp) + val reachedDeadline = _deadline <= now + val delta = max(0, now - _timestamp) try { - // Update the resource counters only if there is some progress - if (timestamp > _timestamp) { + if (now > _timestamp) { logic.onUpdate(this, delta, _limit, reachedDeadline) } - // We should only continue processing the next command if: - // 1. The resource consumption was finished. - // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) - val duration = if (reachedDeadline || isInterrupted) { - consumer.onNext(this, timestamp, delta) - } else { - _deadline - timestamp - } + val duration = consumer.onNext(this, now, delta) // Reset update flags _flag = 0 when (_state) { SimResourceState.Active -> { - val limit = _limit - push(limit) - _duration = duration - - val target = logic.onConsume(this, timestamp, limit, duration) + val target = logic.onConsume(this, now, _limit, duration) + _speed = min(capacity, _limit) + _duration = duration _deadline = target - scheduleUpdate(timestamp, target) + scheduleUpdate(now, target) } SimResourceState.Pending -> if (oldState != SimResourceState.Pending) { @@ -219,7 +211,7 @@ internal class SimResourceContextImpl( } catch (cause: Throwable) { doFail(cause) } finally { - _timestamp = timestamp + _timestamp = now _updateActive = false } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index e95e9e42..c7230a0e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -26,6 +26,7 @@ import io.mockk.* import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.consumer.SimWorkConsumer import org.opendc.simulator.resources.impl.SimResourceContextImpl import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl @@ -57,23 +58,14 @@ class SimResourceContextTest { @Test fun testIntermediateFlush() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(4.0) - 1000 - } else { - ctx.close() - Long.MAX_VALUE - } - } - } + val consumer = SimWorkConsumer(1.0, 1.0) val logic = spyk(object : SimResourceProviderLogic { override fun onFinish(ctx: SimResourceControllableContext) {} override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration }) val context = SimResourceContextImpl(null, interpreter, consumer, logic) + context.capacity = 1.0 context.start() delay(1) // Delay 1 ms to prevent hitting the fast path @@ -85,29 +77,20 @@ class SimResourceContextTest { @Test fun testIntermediateFlushIdle() = runBlockingSimulation { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) - val consumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return if (now == 0L) { - ctx.push(0.0) - 10 - } else { - ctx.close() - Long.MAX_VALUE - } - } - } + val consumer = SimWorkConsumer(1.0, 1.0) val logic = spyk(object : SimResourceProviderLogic {}) val context = SimResourceContextImpl(null, interpreter, consumer, logic) + context.capacity = 1.0 context.start() - delay(5) + delay(500) context.invalidate() - delay(5) + delay(500) context.invalidate() assertAll( - { verify(exactly = 2) { logic.onConsume(any(), any(), 0.0, any()) } }, + { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } }, { verify(exactly = 1) { logic.onFinish(any()) } } ) } |
