diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
4 files changed, 241 insertions, 254 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index 549a338b..b4eb6a38 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -44,6 +44,8 @@ public class FlowSink( override fun createLogic(): FlowConsumerLogic { return object : FlowConsumerLogic { + private val parent = this@FlowSink.parent + override fun onPush( ctx: FlowConsumerContext, now: Long, diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt new file mode 100644 index 00000000..81ed9f26 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * States of the flow connection. + */ +internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source +internal const val ConnActive = 1 // Connection is active and the source is currently being consumed +internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore +internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection + +/** + * Flags of the flow connection + */ +internal const val ConnPulled = 1 shl 2 // The source should be pulled +internal const val ConnPushed = 1 shl 3 // The source has pushed a value +internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active +internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending +internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary +internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending +internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source +internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index c087a28d..9d36483e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -24,7 +24,7 @@ package org.opendc.simulator.flow.internal import mu.KotlinLogging import org.opendc.simulator.flow.* -import java.util.ArrayDeque +import java.util.* import kotlin.math.max import kotlin.math.min @@ -44,20 +44,18 @@ internal class FlowConsumerContextImpl( /** * The capacity of the connection. */ - override var capacity: Double = 0.0 + override var capacity: Double + get() = _capacity set(value) { - val oldValue = field + val oldValue = _capacity // Only changes will be propagated if (value != oldValue) { - field = value - - // Do not pull the source if it has not been started yet - if (_state == State.Active) { - pull() - } + _capacity = value + pull() } } + private var _capacity: Double = 0.0 /** * The current processing rate of the connection. @@ -75,8 +73,25 @@ internal class FlowConsumerContextImpl( /** * Flags to control the convergence of the consumer and source. */ - override var shouldConsumerConverge: Boolean = false override var shouldSourceConverge: Boolean = false + set(value) { + field = value + _flags = + if (value) + _flags or ConnConvergeSource + else + _flags and ConnConvergeSource.inv() + } + override var shouldConsumerConverge: Boolean = false + set(value) { + field = value + + _flags = + if (value) + _flags or ConnConvergeConsumer + else + _flags and ConnConvergeConsumer.inv() + } /** * The clock to track simulation time. @@ -84,36 +99,15 @@ internal class FlowConsumerContextImpl( private val _clock = engine.clock /** - * A flag to indicate the state of the connection. - */ - private var _state = State.Pending - - /** * The current state of the connection. */ private var _demand: Double = 0.0 // The current (pending) demand of the source - private var _activeDemand: Double = 0.0 // The previous demand of the source private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer /** - * A flag to indicate that the source should be pulled. - */ - private var _isPulled = false - - /** - * A flag to indicate that an update is active. - */ - private var _isUpdateActive = false - - /** - * A flag to indicate that an immediate update is scheduled. + * The flags of the flow connection, indicating certain actions. */ - private var _isImmediateUpdateScheduled = false - - /** - * A flag that indicates to the [FlowEngine] that the context is already enqueued to converge. - */ - private var _willConverge: Boolean = false + private var _flags: Int = 0 /** * The timestamp of calls to the callbacks. @@ -130,44 +124,53 @@ internal class FlowConsumerContextImpl( private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5) override fun start() { - check(_state == State.Pending) { "Consumer is already started" } + check(_flags and ConnState == ConnPending) { "Consumer is already started" } engine.batch { - source.onStart(this, _clock.millis()) - _state = State.Active + val now = _clock.millis() + source.onStart(this, now) - pull() + // Mark the connection as active and pulled + val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled + scheduleImmediate(now, newFlags) } } override fun close() { - if (_state == State.Closed) { + var flags = _flags + if (flags and ConnState == ConnClosed) { return } engine.batch { - _state = State.Closed - if (!_isUpdateActive) { + // Mark the connection as closed and pulled (in order to converge) + flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled + _flags = flags + + if (flags and ConnUpdateActive == 0) { val now = _clock.millis() doStopSource(now) // FIX: Make sure the context converges - pull() + scheduleImmediate(now, flags) } } } override fun pull() { - if (_state == State.Closed) { + val flags = _flags + if (flags and ConnState != ConnActive) { return } - _isPulled = true - scheduleImmediate() + // Mark connection as pulled + scheduleImmediate(_clock.millis(), flags or ConnPulled) } override fun flush() { + val flags = _flags + // Do not attempt to flush the connection if the connection is closed or an update is already active - if (_state == State.Closed || _isUpdateActive) { + if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) { return } @@ -181,118 +184,145 @@ internal class FlowConsumerContextImpl( _demand = rate - // Invalidate only if the active demand is changed and no update is active - // If an update is active, it will already get picked up at the end of the update - if (_activeDemand != rate && !_isUpdateActive) { - scheduleImmediate() - } - } + val flags = _flags - /** - * Determine whether the state of the flow connection should be updated. - */ - fun shouldUpdate(timestamp: Long): Boolean { - // The flow connection should be updated for three reasons: - // (1) The source should be pulled (after a call to `pull`) - // (2) The demand of the source has changed (after a call to `push`) - // (3) The timer of the source expired - return _isPulled || _demand != _activeDemand || _deadline == timestamp + if (flags and ConnUpdateActive != 0) { + // If an update is active, it will already get picked up at the end of the update + _flags = flags or ConnPushed + } else { + // Invalidate only if no update is active + scheduleImmediate(_clock.millis(), flags or ConnPushed) + } } /** * Update the state of the flow connection. * * @param now The current virtual timestamp. - * @return A flag to indicate whether the connection has already been updated before convergence. + * @param visited The queue of connections that have been visited during the cycle. + * @param timerQueue The queue of all pending timers. + * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update. */ - fun doUpdate(now: Long): Boolean { - // The connection will only converge if either the source or the consumer wants the converge callback to be - // invoked. - val shouldConverge = shouldSourceConverge || shouldConsumerConverge - var willConverge = false - if (shouldConverge) { - willConverge = _willConverge - _willConverge = true - } - - val oldState = _state - if (oldState != State.Active) { - return willConverge + fun doUpdate( + now: Long, + visited: ArrayDeque<FlowConsumerContextImpl>, + timerQueue: PriorityQueue<FlowEngineImpl.Timer>, + isImmediate: Boolean + ) { + var flags = _flags + + // Precondition: The flow connection must be active + if (flags and ConnState != ConnActive) { + return } - _isUpdateActive = true - _isImmediateUpdateScheduled = false + val deadline = _deadline + val reachedDeadline = deadline == now + var newDeadline = deadline + var hasUpdated = false try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired - val deadline = if (_isPulled || _deadline == now) { + newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { val lastPull = _lastPull val delta = max(0, now - lastPull) - _isPulled = false + // Update state before calling into the outside world, so it observes a consistent state _lastPull = now + _flags = (flags and ConnPulled.inv()) or ConnUpdateActive + hasUpdated = true val duration = source.onPull(this, now, delta) + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = _flags + if (duration != Long.MAX_VALUE) now + duration else duration } else { - _deadline + deadline } - // Check whether the state has changed after [consumer.onNext] - when (_state) { - State.Active -> { - val demand = _demand - if (demand != _activeDemand) { - val lastPush = _lastPush - val delta = max(0, now - lastPush) - - _lastPush = now - - logic.onPush(this, now, delta, demand) - } - } - State.Closed -> doStopSource(now) - State.Pending -> throw IllegalStateException("Illegal transition to pending state") - } + // Push to the consumer if the rate of the source has changed (after a call to `push`) + val newState = flags and ConnState + if (newState == ConnActive && flags and ConnPushed != 0) { + val lastPush = _lastPush + val delta = max(0, now - lastPush) - // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value - val newLimit = _demand + // Update state before calling into the outside world, so it observes a consistent state + _lastPush = now + _flags = (flags and ConnPushed.inv()) or ConnUpdateActive + hasUpdated = true - // Flush the changes to the flow - _activeDemand = newLimit - _deadline = deadline - _rate = min(capacity, newLimit) + logic.onPush(this, now, delta, _demand) - // Schedule an update at the new deadline - scheduleDelayed(now, deadline) + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = _flags + } else if (newState == ConnClosed) { + hasUpdated = true + + // The source has called [FlowConnection.close], so clean up the connection + doStopSource(now) + } } catch (cause: Throwable) { + // Mark the connection as closed + flags = (flags and ConnState.inv()) or ConnClosed + doFailSource(now, cause) - } finally { - _isUpdateActive = false } - return willConverge - } + // Check whether the connection needs to be added to the visited queue. This is the case when: + // (1) An update was performed (either a push or a pull) + // (2) Either the source or consumer want to converge, and + // (3) Convergence is not already pending (ConnConvergePending) + if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) { + visited.add(this) + flags = flags or ConnConvergePending + } - /** - * Prune the elapsed timers from this context. - */ - fun updateTimers() { - // Invariant: Any pending timer should only point to a future timestamp - // See also `scheduleDelayed` - _timer = _pendingTimers.poll() - } + // Compute the new flow rate of the connection + // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value + _rate = min(_capacity, _demand) - /** - * Try to re-schedule the resource context in case it was skipped. - */ - fun tryReschedule(now: Long) { - val deadline = _deadline - if (deadline > now && deadline != Long.MAX_VALUE) { - scheduleDelayed(now, deadline) + // Indicate that no update is active anymore and flush the flags + _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv() + + val pendingTimers = _pendingTimers + + // Prune the head timer if this is a delayed update + val timer = if (!isImmediate) { + // Invariant: Any pending timer should only point to a future timestamp + // See also `scheduleDelayed` + val timer = pendingTimers.poll() + _timer = timer + timer + } else { + _timer + } + + // Set the new deadline and schedule a delayed update for that deadline + _deadline = newDeadline + + // Check whether we need to schedule a new timer for this connection. That is the case when: + // (1) The deadline is valid (not the maximum value) + // (2) The connection is active + // (3) The current active timer for the connection points to a later deadline + if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) { + // Ignore any deadline scheduled at the maximum value + // This indicates that the source does not want to register a timer + return + } + + // Construct a timer for the new deadline and add it to the global queue of timers + val newTimer = FlowEngineImpl.Timer(this, newDeadline) + _timer = newTimer + timerQueue.add(newTimer) + + // A timer already exists for this connection, so add it to the queue of pending timers + if (timer != null) { + pendingTimers.addFirst(timer) } } @@ -300,17 +330,22 @@ internal class FlowConsumerContextImpl( * This method is invoked when the system converges into a steady state. */ fun onConverge(now: Long) { - _willConverge = false - try { - if (_state == State.Active && shouldSourceConverge) { + val flags = _flags + + // The connection is converging now, so unset the convergence pending flag + _flags = flags and ConnConvergePending.inv() + + // Call the source converge callback if it has enabled convergence and the connection is active + if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) { val delta = max(0, now - _lastSourceConvergence) _lastSourceConvergence = now source.onConverge(this, now, delta) } - if (shouldConsumerConverge) { + // Call the consumer callback if it has enabled convergence + if (flags and ConnConvergeConsumer != 0) { val delta = max(0, now - _lastConsumerConvergence) _lastConsumerConvergence = now @@ -369,57 +404,16 @@ internal class FlowConsumerContextImpl( /** * Schedule an immediate update for this connection. */ - private fun scheduleImmediate() { + private fun scheduleImmediate(now: Long, flags: Int) { // In case an immediate update is already scheduled, no need to do anything - if (_isImmediateUpdateScheduled) { + if (flags and ConnUpdatePending != 0) { + _flags = flags return } - _isImmediateUpdateScheduled = true + // Mark the connection that there is an update pending + _flags = flags or ConnUpdatePending - val now = _clock.millis() engine.scheduleImmediate(now, this) } - - /** - * Schedule a delayed update for this resource context. - */ - private fun scheduleDelayed(now: Long, target: Long) { - // Ignore any target scheduled at the maximum value - // This indicates that the sources does not want to register a timer - if (target == Long.MAX_VALUE) { - return - } - - val timer = _timer - - if (timer == null) { - // No existing timer exists, so schedule a new timer and update the head - _timer = engine.scheduleDelayed(now, this, target) - } else if (target < timer.target) { - // Existing timer is further in the future, so schedule a new timer ahead of it - _timer = engine.scheduleDelayed(now, this, target) - _pendingTimers.addFirst(timer) - } - } - - /** - * The state of a flow connection. - */ - private enum class State { - /** - * The connection is pending and the consumer is waiting to consume the source. - */ - Pending, - - /** - * The connection is active and the source is currently being consumed. - */ - Active, - - /** - * The connection is closed and the source cannot be consumed through this connection anymore. - */ - Closed - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index c8170a43..019b5f10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /** * The systems that have been visited during the engine cycle. */ - private val visited = ArrayDeque<FlowConsumerContextImpl>() + private val visited: ArrayDeque<FlowConsumerContextImpl> = ArrayDeque() /** * The index in the batch stack. @@ -71,31 +71,18 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va private var batchIndex = 0 /** - * A flag to indicate that the engine is currently active. - */ - private val isRunning: Boolean - get() = batchIndex > 0 - - /** * Update the specified [ctx] synchronously. */ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { - if (!ctx.doUpdate(now)) { - visited.add(ctx) - } + ctx.doUpdate(now, visited, futureQueue, isImmediate = true) // In-case the engine is already running in the call-stack, return immediately. The changes will be picked // up by the active engine. - if (isRunning) { + if (batchIndex > 0) { return } - try { - batchIndex++ - runEngine(now) - } finally { - batchIndex-- - } + runEngine(now) } /** @@ -109,36 +96,11 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // In-case the engine is already running in the call-stack, return immediately. The changes will be picked // up by the active engine. - if (isRunning) { + if (batchIndex > 0) { return } - try { - batchIndex++ - runEngine(now) - } finally { - batchIndex-- - } - } - - /** - * Schedule the engine to run at [target] to update the flow contexts. - * - * This method will override earlier calls to this method for the same [ctx]. - * - * @param now The current virtual timestamp. - * @param ctx The flow context to which the event applies. - * @param target The timestamp when the interrupt should happen. - */ - fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer { - val futureQueue = futureQueue - - require(target >= now) { "Timestamp must be in the future" } - - val timer = Timer(ctx, target) - futureQueue.add(timer) - - return timer + runEngine(now) } override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) @@ -149,9 +111,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va override fun popBatch() { try { - // Flush the work if the platform is not already running + // Flush the work if the engine is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - runEngine(clock.millis()) + doRunEngine(clock.millis()) } } finally { batchIndex-- @@ -159,9 +121,21 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va } /** - * Run all the enqueued actions for the specified [timestamp][now]. + * Run the engine and mark as active while running. */ private fun runEngine(now: Long) { + try { + batchIndex++ + doRunEngine(now) + } finally { + batchIndex-- + } + } + + /** + * Run all the enqueued actions for the specified [timestamp][now]. + */ + private fun doRunEngine(now: Long) { val queue = queue val futureQueue = futureQueue val futureInvocations = futureInvocations @@ -179,27 +153,16 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // Execute all scheduled updates at current timestamp while (true) { val timer = futureQueue.peek() ?: break - val ctx = timer.ctx val target = timer.target - assert(target >= now) { "Internal inconsistency: found update of the past" } - if (target > now) { break } - futureQueue.poll() - - // Update the existing timers of the connection - ctx.updateTimers() + assert(target >= now) { "Internal inconsistency: found update of the past" } - if (ctx.shouldUpdate(now)) { - if (!ctx.doUpdate(now)) { - visited.add(ctx) - } - } else { - ctx.tryReschedule(now) - } + futureQueue.poll() + timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -208,17 +171,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // Execute all immediate updates while (true) { val ctx = queue.poll() ?: break - - if (ctx.shouldUpdate(now) && !ctx.doUpdate(now)) { - visited.add(ctx) - } + ctx.doUpdate(now, visited, futureQueue, isImmediate = true) } - for (system in visited) { - system.onConverge(now) + while (true) { + val ctx = visited.poll() ?: break + ctx.onConverge(now) } - - visited.clear() } while (queue.isNotEmpty()) // Schedule an engine invocation for the next update to occur. @@ -242,18 +201,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // Case 2: A new timer was registered ahead of the other timers. // Solution: Schedule a new scheduler invocation @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout( - target - now, - { - try { - batchIndex++ - runEngine(target) - } finally { - batchIndex-- - } - }, - context - ) + val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context) scheduled.addFirst(Invocation(target, handle)) break } else if (invocation.timestamp < target) { @@ -274,7 +222,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case * the invocation is not needed anymore, it can be cancelled via [cancel]. */ - private data class Invocation( + private class Invocation( @JvmField val timestamp: Long, @JvmField val handle: DisposableHandle ) { |
