diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
17 files changed, 121 insertions, 146 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index 4685a755..a49826f4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -92,9 +92,9 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - source.onStop(conn, now, delta) + source.onStop(conn, now) if (!cont.isCompleted) { cont.resume(Unit) @@ -105,18 +105,18 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return try { - source.onPull(conn, now, delta) + source.onPull(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - source.onConverge(conn, now, delta) + source.onConverge(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index 50fbc9c7..1d3adb10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -31,10 +31,9 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param rate The requested processing rate of the source. */ - public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} + public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {} /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -44,17 +43,15 @@ public interface FlowConsumerLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} + public fun onConverge(ctx: FlowConsumerContext, now: Long) {} /** * This method is invoked when the [FlowSource] completed or failed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. * @param cause The cause of the failure or `null` if the source completed. */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt index d1afda6f..62cb10d1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt @@ -30,7 +30,6 @@ public interface FlowConvergenceListener { * This method is invoked when the system has converged to a steady-state. * * @param now The timestamp at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(now: Long, delta: Long) {} + public fun onConverge(now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index a5663293..0ad18f6a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -25,7 +25,11 @@ package org.opendc.simulator.flow import mu.KotlinLogging import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.max + +/** + * The logging instance of this connection. + */ +private val logger = KotlinLogging.logger {} /** * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. @@ -40,11 +44,6 @@ public class FlowForwarder( private val isCoupled: Boolean = false ) : FlowSource, FlowConsumer, AutoCloseable { /** - * The logging instance of this connection. - */ - private val logger = KotlinLogging.logger {} - - /** * The delegate [FlowSource]. */ private var delegate: FlowSource? = null @@ -81,8 +80,6 @@ public class FlowForwarder( _innerCtx?.pull(now) } - @JvmField var lastPull = Long.MAX_VALUE - override fun push(rate: Double) { if (delegate == null) { return @@ -102,8 +99,7 @@ public class FlowForwarder( if (hasDelegateStarted) { val now = engine.clock.millis() - val delta = max(0, now - lastPull) - delegate.onStop(this, now, delta) + delegate.onStop(this, now) } } } @@ -163,7 +159,7 @@ public class FlowForwarder( } } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _innerCtx = null val delegate = delegate @@ -171,25 +167,24 @@ public class FlowForwarder( reset() try { - delegate.onStop(this._ctx, now, delta) + delegate.onStop(this._ctx, now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } } } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = delegate if (!hasDelegateStarted) { start() } - _ctx.lastPull = now - updateCounters(conn, delta) + updateCounters(conn, now) return try { - delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE + delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -198,10 +193,10 @@ public class FlowForwarder( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate?.onConverge(this._ctx, now, delta) - listener?.onConverge(now, delta) + delegate?.onConverge(this._ctx, now) + listener?.onConverge(now) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -217,8 +212,10 @@ public class FlowForwarder( val delegate = delegate ?: return try { - delegate.onStart(_ctx, engine.clock.millis()) + val now = engine.clock.millis() + delegate.onStart(_ctx, now) hasDelegateStarted = true + _lastUpdate = now } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } reset() @@ -242,11 +239,15 @@ public class FlowForwarder( * The requested flow rate. */ private var _demand: Double = 0.0 + private var _lastUpdate = 0L /** * Update the flow counters for the transformer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long) { + private fun updateCounters(ctx: FlowConnection, now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + val delta = now - lastUpdate if (delta <= 0) { return } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt index 6867bcef..af702701 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt @@ -45,20 +45,20 @@ public class FlowMapper( source.onStart(delegate, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { val delegate = checkNotNull(_conn) { "Invariant violation" } _conn = null - source.onStop(delegate, now, delta) + source.onStop(delegate, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val delegate = checkNotNull(_conn) { "Invariant violation" } - return source.onPull(delegate, now, delta) + return source.onPull(delegate, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { val delegate = _conn ?: return - source.onConverge(delegate, now, delta) + source.onConverge(delegate, now) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index e9094443..d0324ce8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -104,34 +104,39 @@ public class FlowSink( * [FlowConsumerLogic] of a sink. */ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { - updateCounters(ctx, delta, rate, ctx.capacity) + updateCounters(ctx, now, rate, ctx.capacity) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta, 0.0, 0.0) + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { + updateCounters(ctx, now, 0.0, 0.0) _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + override fun onConverge(ctx: FlowConsumerContext, now: Long) { + parent?.onConverge(now) } /** * The previous demand and capacity for the consumer. */ private val _previous = DoubleArray(2) + private var _previousUpdate = Long.MAX_VALUE /** * Update the counters of the flow consumer. */ - private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) { + val previousUpdate = _previousUpdate + _previousUpdate = now + val delta = now - previousUpdate + val counters = counters val previous = _previous val demand = previous[0] diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 3a7e52aa..a48ac18e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -42,19 +42,17 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the source finished. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. */ - public fun onStop(conn: FlowConnection, now: Long, delta: Long) {} + public fun onStop(conn: FlowConnection, now: Long) {} /** * This method is invoked when the source is pulled. * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the pull is occurring. - * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. * @return The duration after which the resource consumer should be pulled again. */ - public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long + public fun onPull(conn: FlowConnection, now: Long): Long /** * This method is invoked when the flow graph has converged into a steady-state system. @@ -64,7 +62,6 @@ public interface FlowSource { * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the system converged. - * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {} + public fun onConverge(conn: FlowConnection, now: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 1d39d61c..bc6bae71 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.internal import mu.KotlinLogging import org.opendc.simulator.flow.* import java.util.* -import kotlin.math.max import kotlin.math.min /** @@ -124,14 +123,6 @@ internal class FlowConsumerContextImpl( private var _flags: Int = 0 /** - * The timestamp of calls to the callbacks. - */ - private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` - private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` - private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence` - private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence` - - /** * The timers at which the context is scheduled to be interrupted. */ private var _timer: Long = Long.MAX_VALUE @@ -238,15 +229,11 @@ internal class FlowConsumerContextImpl( try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { - val lastPull = _lastPull - val delta = max(0, now - lastPull) - // Update state before calling into the outside world, so it observes a consistent state - _lastPull = now _flags = (flags and ConnPulled.inv()) or ConnUpdateActive hasUpdated = true - val duration = source.onPull(this, now, delta) + val duration = source.onPull(this, now) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -266,15 +253,11 @@ internal class FlowConsumerContextImpl( // Push to the consumer if the rate of the source has changed (after a call to `push`) if (flags and ConnPushed != 0) { - val lastPush = _lastPush - val delta = max(0, now - lastPush) - // Update state before calling into the outside world, so it observes a consistent state - _lastPush = now _flags = (flags and ConnPushed.inv()) or ConnUpdateActive hasUpdated = true - logic.onPush(this, now, delta, _demand) + logic.onPush(this, now, _demand) // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags @@ -372,18 +355,12 @@ internal class FlowConsumerContextImpl( // Call the source converge callback if it has enabled convergence if (flags and ConnConvergeSource != 0) { - val delta = max(0, now - _lastSourceConvergence) - _lastSourceConvergence = now - - source.onConverge(this, now, delta) + source.onConverge(this, now) } // Call the consumer callback if it has enabled convergence if (flags and ConnConvergeConsumer != 0) { - val delta = max(0, now - _lastConsumerConvergence) - _lastConsumerConvergence = now - - logic.onConverge(this, now, delta) + logic.onConverge(this, now) } } catch (cause: Throwable) { // Invoke the finish callbacks @@ -403,7 +380,7 @@ internal class FlowConsumerContextImpl( */ private fun doStopSource(now: Long) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) doFinishConsumer(now, null) } catch (cause: Throwable) { doFinishConsumer(now, cause) @@ -415,7 +392,7 @@ internal class FlowConsumerContextImpl( */ private fun doFailSource(now: Long, cause: Throwable) { try { - source.onStop(this, now, max(0, now - _lastPull)) + source.onStop(this, now) } catch (e: Throwable) { e.addSuppressed(cause) doFinishConsumer(now, e) @@ -427,7 +404,7 @@ internal class FlowConsumerContextImpl( */ private fun doFinishConsumer(now: Long, cause: Throwable?) { try { - logic.onFinish(this, now, max(0, now - _lastPush), cause) + logic.onFinish(this, now, cause) } catch (e: Throwable) { e.addSuppressed(cause) logger.error(e) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 7fe0c1b7..1d7d22ef 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque -import kotlin.math.max /** * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means @@ -139,16 +138,8 @@ public class ForwardingFlowMultiplexer( override fun flushCounters(input: FlowConsumer) {} - private var _lastConverge = Long.MAX_VALUE - - override fun onConverge(now: Long, delta: Long) { - val listener = listener - if (listener != null) { - val lastConverge = _lastConverge - _lastConverge = now - val duration = max(0, now - lastConverge) - listener.onConverge(now, duration) - } + override fun onConverge(now: Long) { + listener?.onConverge(now) } /** @@ -167,9 +158,9 @@ public class ForwardingFlowMultiplexer( forwarder.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { forwarder.cancel() - forwarder.onStop(conn, now, delta) + forwarder.onStop(conn, now) } override fun toString(): String = "ForwardingFlowMultiplexer.Output" diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7ee4d326..cc831862 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -257,7 +257,7 @@ public class MaxMinFlowMultiplexer( _lastConverge = now _lastConvergeInput = input - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } } @@ -285,13 +285,11 @@ public class MaxMinFlowMultiplexer( * This method is invoked when one of the outputs converges. */ fun convergeOutput(output: Output, now: Long) { - val lastConverge = _lastConverge val parent = parent if (parent != null) { _lastConverge = now - - parent.onConverge(now, max(0, now - lastConverge)) + parent.onConverge(now) } if (!output.isActive) { @@ -489,7 +487,7 @@ public class MaxMinFlowMultiplexer( val previousUpdate = _previousUpdate _previousUpdate = now - val delta = (now - previousUpdate).coerceAtLeast(0) + val delta = now - previousUpdate if (delta <= 0) { return } @@ -647,7 +645,6 @@ public class MaxMinFlowMultiplexer( override fun onPush( ctx: FlowConsumerContext, now: Long, - delta: Long, rate: Double ) { doUpdateCounters(now) @@ -660,7 +657,7 @@ public class MaxMinFlowMultiplexer( scheduler.trigger(now) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { doUpdateCounters(now) limit = 0.0 @@ -672,7 +669,7 @@ public class MaxMinFlowMultiplexer( _ctx = null } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onConverge(ctx: FlowConsumerContext, now: Long) { scheduler.convergeInput(this, now) } @@ -777,7 +774,7 @@ public class MaxMinFlowMultiplexer( scheduler.registerOutput(this) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _conn = null capacity = 0.0 isActive = false @@ -785,7 +782,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterOutput(this, now) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { val capacity = capacity if (capacity != conn.capacity) { this.capacity = capacity @@ -808,7 +805,7 @@ public class MaxMinFlowMultiplexer( } } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { if (_isActivationOutput) { scheduler.convergeOutput(this, now) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt index d9779c6a..6cfcc82c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -37,8 +37,17 @@ public class FixedFlowSource(private val amount: Double, private val utilization } private var remainingAmount = amount + private var lastPull: Long = 0L + + override fun onStart(conn: FlowConnection, now: Long) { + lastPull = now + } + + override fun onPull(conn: FlowConnection, now: Long): Long { + val lastPull = lastPull + this.lastPull = now + val delta = (now - lastPull).coerceAtLeast(0) - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val consumed = conn.rate * delta / 1000.0 val limit = conn.capacity * utilization diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 6dd60d95..80127fb5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -53,21 +53,21 @@ public class FlowSourceRateAdapter( delegate.onStart(conn, now) } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { try { - delegate.onStop(conn, now, delta) + delegate.onStop(conn, now) } finally { rate = 0.0 } } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + override fun onPull(conn: FlowConnection, now: Long): Long { + return delegate.onPull(conn, now) } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { try { - delegate.onConverge(conn, now, delta) + delegate.onConverge(conn, now) } finally { rate = conn.rate } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index ae537845..c9a52128 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -37,11 +37,11 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource _iterator = trace.iterator() } - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + override fun onStop(conn: FlowConnection, now: Long) { _iterator = null } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget if (nextTarget > now) { diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index fe39eb2c..e7b25554 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -36,7 +36,7 @@ class FlowConsumerContextTest { fun testFlushWithoutCommand() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -57,7 +57,7 @@ class FlowConsumerContextTest { fun testDoubleStart() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(0.0) 1000 @@ -82,7 +82,7 @@ class FlowConsumerContextTest { fun testIdempotentCapacityChange() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (now == 0L) { conn.push(1.0) 1000 @@ -99,6 +99,6 @@ class FlowConsumerContextTest { context.start() context.capacity = 4200.0 - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 1) { consumer.onPull(any(), any()) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 12e72b8f..8b090593 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -45,7 +45,7 @@ internal class FlowForwarderTest { launch { source.consume(forwarder) } forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -66,7 +66,7 @@ internal class FlowForwarderTest { forwarder.consume(object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -87,7 +87,7 @@ internal class FlowForwarderTest { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -113,7 +113,7 @@ internal class FlowForwarderTest { val forwarder = FlowForwarder(engine) val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onStop(any(), any(), any()) } + verify(exactly = 0) { consumer.onStop(any(), any()) } } @Test @@ -140,7 +140,7 @@ internal class FlowForwarderTest { forwarder.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -158,7 +158,7 @@ internal class FlowForwarderTest { source.cancel() verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any()) } } @Test @@ -168,7 +168,7 @@ internal class FlowForwarderTest { val source = FlowSink(engine, 2000.0) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -198,7 +198,7 @@ internal class FlowForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { source.onPull(any(), any(), any()) } + verify(exactly = 1) { source.onPull(any(), any()) } } @Test @@ -214,11 +214,13 @@ internal class FlowForwarderTest { yield() - assertEquals(2.0, source.counters.actual) - assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } - assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } - assertEquals(2000, clock.millis()) + assertAll( + { assertEquals(2.0, source.counters.actual) }, + { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } }, + { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } }, + { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } }, + { assertEquals(2000, clock.millis()) } + ) } @Test @@ -246,7 +248,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { throw IllegalStateException("Test") } }) @@ -269,7 +271,7 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } @@ -301,11 +303,11 @@ internal class FlowForwarderTest { conn.shouldSourceConverge = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } - override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + override fun onConverge(conn: FlowConnection, now: Long) { throw IllegalStateException("Test") } }) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 70c75864..726ddbf7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -65,7 +65,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 3) { consumer.onPull(any(), any(), any()) } + verify(exactly = 3) { consumer.onPull(any(), any()) } } @Test @@ -95,7 +95,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { conn.close() return Long.MAX_VALUE } @@ -122,7 +122,7 @@ internal class FlowSinkTest { resCtx = conn } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -154,7 +154,7 @@ internal class FlowSinkTest { throw IllegalStateException("Hi") } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return Long.MAX_VALUE } } @@ -173,7 +173,7 @@ internal class FlowSinkTest { val consumer = object : FlowSource { var isFirst = true - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) @@ -231,7 +231,7 @@ internal class FlowSinkTest { val provider = FlowSink(engine, capacity) val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE } provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt index 187dacd9..ef15f711 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -112,7 +112,7 @@ internal class ForwardingFlowMultiplexerTest { isFirst = true } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + override fun onPull(conn: FlowConnection, now: Long): Long { return if (isFirst) { isFirst = false conn.push(1.0) |
