diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-30 11:55:27 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:39 +0200 |
| commit | 7b2d03add3170b9142bf42c5a64aaa263773caf7 (patch) | |
| tree | 341fbc68154156fbea250db5d29a6e38e7a69fe5 /opendc-simulator/opendc-simulator-flow | |
| parent | 4cc1d40d421c8736f8b21b360b61d6b065158b7a (diff) | |
refactor(simulator): Separate push and pull flags
This change separates the push and pull flags in
FlowConsumerContextImpl, meaning that sources can now push directly
without pulling and vice versa.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
2 files changed, 103 insertions, 99 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9f3afc4d..f62528ed 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -36,12 +36,7 @@ internal class FlowConsumerContextImpl( private val logic: FlowConsumerLogic ) : FlowConsumerContext { /** - * The clock to track simulation time. - */ - private val _clock = engine.clock - - /** - * The capacity of the resource. + * The capacity of the connection. */ override var capacity: Double = 0.0 set(value) { @@ -55,45 +50,56 @@ internal class FlowConsumerContextImpl( } /** - * A flag to indicate the state of the context. - */ - private var _state = State.Pending - - /** - * The current processing speed of the resource. + * The current processing rate of the connection. */ override val rate: Double get() = _rate private var _rate = 0.0 /** - * The current resource processing demand. + * The current flow processing demand. */ override val demand: Double - get() = _limit + get() = _demand + + /** + * The clock to track simulation time. + */ + private val _clock = engine.clock + + /** + * A flag to indicate the state of the connection. + */ + private var _state = State.Pending /** - * The current state of the resource context. + * The current state of the connection. */ - private var _limit: Double = 0.0 - private var _activeLimit: Double = 0.0 - private var _deadline: Long = Long.MIN_VALUE + private var _demand: Double = 0.0 // The current (pending) demand of the source + private var _activeDemand: Double = 0.0 // The previous demand of the source + private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer + + /** + * A flag to indicate that the source should be pulled. + */ + private var _isPulled = false /** * A flag to indicate that an update is active. */ - private var _updateActive = false + private var _isUpdateActive = false /** - * The update flag indicating why the update was triggered. + * A flag to indicate that an immediate update is scheduled. */ - private var _flag: Int = 0 + private var _isImmediateUpdateScheduled = false /** * The timestamp of calls to the callbacks. */ - private var _lastUpdate: Long = Long.MIN_VALUE - private var _lastConvergence: Long = Long.MAX_VALUE + private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` + private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` + private var _lastConvergence: Long = Long.MAX_VALUE // Last call to `onConvergence` /** * The timers at which the context is scheduled to be interrupted. @@ -110,35 +116,35 @@ internal class FlowConsumerContextImpl( } override fun close() { - if (_state == State.Stopped) { + if (_state == State.Closed) { return } engine.batch { - _state = State.Stopped - if (!_updateActive) { + _state = State.Closed + if (!_isUpdateActive) { val now = _clock.millis() - val delta = max(0, now - _lastUpdate) + val delta = max(0, now - _lastPull) doStop(now, delta) // FIX: Make sure the context converges - _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(_clock.millis()) + pull() } } } override fun pull() { - if (_state == State.Stopped) { + if (_state == State.Closed) { return } - _flag = _flag or FLAG_INTERRUPT - scheduleUpdate(_clock.millis()) + _isPulled = true + scheduleImmediate() } override fun flush() { - if (_state == State.Stopped) { + // Do not attempt to flush the connection if the connection is closed or an update is already active + if (_state == State.Closed || _isUpdateActive) { return } @@ -146,26 +152,28 @@ internal class FlowConsumerContextImpl( } override fun push(rate: Double) { - if (_limit == rate) { + if (_demand == rate) { return } - _limit = rate + _demand = rate - // Invalidate only if the active limit is change and no update is active + // Invalidate only if the active demand is changed and no update is active // If an update is active, it will already get picked up at the end of the update - if (_activeLimit != rate && !_updateActive) { - _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(_clock.millis()) + if (_activeDemand != rate && !_isUpdateActive) { + scheduleImmediate() } } /** - * Determine whether the state of the resource context should be updated. + * Determine whether the state of the flow connection should be updated. */ fun shouldUpdate(timestamp: Long): Boolean { - // Either the resource context is flagged or there is a pending update at this timestamp - return _flag != 0 || _limit != _activeLimit || _deadline == timestamp + // The flow connection should be updated for three reasons: + // (1) The source should be pulled (after a call to `pull`) + // (2) The demand of the source has changed (after a call to `push`) + // (3) The timer of the source expired + return _isPulled || _demand != _activeDemand || _deadline == timestamp } /** @@ -177,43 +185,58 @@ internal class FlowConsumerContextImpl( return } - val lastUpdate = _lastUpdate + _isUpdateActive = true + _isImmediateUpdateScheduled = false - _lastUpdate = now - _updateActive = true - - val delta = max(0, now - lastUpdate) + val lastPush = _lastPush + val pushDelta = max(0, now - lastPush) try { - val duration = source.onPull(this, now, delta) - val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration - - // Reset update flags - _flag = 0 + // Pull the source if (1) `pull` is called or (2) the timer of the source has expired + val deadline = if (_isPulled || _deadline == now) { + val lastPull = _lastPull + val pullDelta = max(0, now - lastPull) + + _isPulled = false + _lastPull = now + + val duration = source.onPull(this, now, pullDelta) + if (duration != Long.MAX_VALUE) + now + duration + else + duration + } else { + _deadline + } // Check whether the state has changed after [consumer.onNext] when (_state) { State.Active -> { - logic.onPush(this, now, delta, _limit) + val demand = _demand + if (demand != _activeDemand) { + _lastPush = now - // Schedule an update at the new deadline - scheduleUpdate(now, newDeadline) + logic.onPush(this, now, pushDelta, demand) + } } - State.Stopped -> doStop(now, delta) + State.Closed -> doStop(now, pushDelta) State.Pending -> throw IllegalStateException("Illegal transition to pending state") } // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value - val newLimit = _limit + val newLimit = _demand // Flush the changes to the flow - _activeLimit = newLimit - _deadline = newDeadline + _activeDemand = newLimit + _deadline = deadline _rate = min(capacity, newLimit) + + // Schedule an update at the new deadline + scheduleDelayed(now, deadline) } catch (cause: Throwable) { - doFail(now, delta, cause) + doFail(now, pushDelta, cause) } finally { - _updateActive = false + _isUpdateActive = false } } @@ -237,7 +260,7 @@ internal class FlowConsumerContextImpl( fun tryReschedule(now: Long) { val deadline = _deadline if (deadline > now && deadline != Long.MAX_VALUE) { - scheduleUpdate(now, deadline) + scheduleDelayed(now, deadline) } } @@ -255,7 +278,7 @@ internal class FlowConsumerContextImpl( logic.onConverge(this, timestamp, delta) } catch (cause: Throwable) { - doFail(timestamp, max(0, timestamp - _lastUpdate), cause) + doFail(timestamp, max(0, timestamp - _lastPull), cause) } } @@ -272,7 +295,7 @@ internal class FlowConsumerContextImpl( doFail(now, delta, cause) } finally { _deadline = Long.MAX_VALUE - _limit = 0.0 + _demand = 0.0 } } @@ -308,16 +331,24 @@ internal class FlowConsumerContextImpl( } /** - * Schedule an update for this resource context. + * Schedule an immediate update for this connection. */ - private fun scheduleUpdate(now: Long) { + private fun scheduleImmediate() { + // In case an immediate update is already scheduled, no need to do anything + if (_isImmediateUpdateScheduled) { + return + } + + _isImmediateUpdateScheduled = true + + val now = _clock.millis() engine.scheduleImmediate(now, this) } /** * Schedule a delayed update for this resource context. */ - private fun scheduleUpdate(now: Long, target: Long) { + private fun scheduleDelayed(now: Long, target: Long) { val timers = _timers if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { timers.addFirst(engine.scheduleDelayed(now, this, target)) @@ -325,32 +356,22 @@ internal class FlowConsumerContextImpl( } /** - * The state of a resource context. + * The state of a flow connection. */ private enum class State { /** - * The resource context is pending and the resource is waiting to be consumed. + * The connection is pending and the consumer is waiting to consume the source. */ Pending, /** - * The resource context is active and the resource is currently being consumed. + * The connection is active and the source is currently being consumed. */ Active, /** - * The resource context is stopped and the resource cannot be consumed anymore. + * The connection is closed and the source cannot be consumed through this connection anymore. */ - Stopped + Closed } - - /** - * A flag to indicate that the context should be invalidated. - */ - private val FLAG_INVALIDATE = 0b01 - - /** - * A flag to indicate that the context should be interrupted. - */ - private val FLAG_INTERRUPT = 0b10 } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index 061ebea6..380fd38a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -28,7 +28,6 @@ import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.internal.FlowConsumerContextImpl import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource /** * A test suite for the [FlowConsumerContextImpl] class. @@ -56,22 +55,6 @@ class FlowConsumerContextTest { } @Test - fun testIntermediateFlush() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = FixedFlowSource(1.0, 1.0) - - val logic = spyk(object : FlowConsumerLogic {}) - val context = FlowConsumerContextImpl(engine, consumer, logic) - context.capacity = 1.0 - - context.start() - delay(1) // Delay 1 ms to prevent hitting the fast path - engine.scheduleSync(engine.clock.millis(), context) - - verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) } - } - - @Test fun testDoubleStart() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { |
