summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-01-12 23:17:33 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-18 20:05:10 +0100
commit5a0821e19eed87e91054289051213cb60b4235b4 (patch)
tree3737158b657c8cb2f102621f90d7f5dedddaba77 /opendc-simulator/opendc-simulator-flow
parent9e69eaf1c7b0c4985d37f3f4595e2e2478d389f2 (diff)
refactor(simulator): Remove delta parameter from flow callbacks
This change removes the delta parameter from the callbacks of the flow framework. This parameter was used to indicate the duration in time between the last call and the current call. However, its usefulness was limited since the actual delta values needed by implementors of this method had to be bridged across different flow callbacks.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt43
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt37
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt17
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt11
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt2
17 files changed, 121 insertions, 146 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
index 4685a755..a49826f4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -92,9 +92,9 @@ public suspend fun FlowConsumer.consume(source: FlowSource) {
}
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- source.onStop(conn, now, delta)
+ source.onStop(conn, now)
if (!cont.isCompleted) {
cont.resume(Unit)
@@ -105,18 +105,18 @@ public suspend fun FlowConsumer.consume(source: FlowSource) {
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return try {
- source.onPull(conn, now, delta)
+ source.onPull(conn, now)
} catch (cause: Throwable) {
cont.resumeWithException(cause)
throw cause
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- source.onConverge(conn, now, delta)
+ source.onConverge(conn, now)
} catch (cause: Throwable) {
cont.resumeWithException(cause)
throw cause
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
index 50fbc9c7..1d3adb10 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -31,10 +31,9 @@ public interface FlowConsumerLogic {
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
* @param rate The requested processing rate of the source.
*/
- public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {}
+ public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {}
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -44,17 +43,15 @@ public interface FlowConsumerLogic {
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
+ public fun onConverge(ctx: FlowConsumerContext, now: Long) {}
/**
* This method is invoked when the [FlowSource] completed or failed.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
* @param cause The cause of the failure or `null` if the source completed.
*/
- public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {}
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
index d1afda6f..62cb10d1 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
@@ -30,7 +30,6 @@ public interface FlowConvergenceListener {
* This method is invoked when the system has converged to a steady-state.
*
* @param now The timestamp at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(now: Long, delta: Long) {}
+ public fun onConverge(now: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index a5663293..0ad18f6a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -25,7 +25,11 @@ package org.opendc.simulator.flow
import mu.KotlinLogging
import org.opendc.simulator.flow.internal.D_MS_TO_S
import org.opendc.simulator.flow.internal.MutableFlowCounters
-import kotlin.math.max
+
+/**
+ * The logging instance of this connection.
+ */
+private val logger = KotlinLogging.logger {}
/**
* A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
@@ -40,11 +44,6 @@ public class FlowForwarder(
private val isCoupled: Boolean = false
) : FlowSource, FlowConsumer, AutoCloseable {
/**
- * The logging instance of this connection.
- */
- private val logger = KotlinLogging.logger {}
-
- /**
* The delegate [FlowSource].
*/
private var delegate: FlowSource? = null
@@ -81,8 +80,6 @@ public class FlowForwarder(
_innerCtx?.pull(now)
}
- @JvmField var lastPull = Long.MAX_VALUE
-
override fun push(rate: Double) {
if (delegate == null) {
return
@@ -102,8 +99,7 @@ public class FlowForwarder(
if (hasDelegateStarted) {
val now = engine.clock.millis()
- val delta = max(0, now - lastPull)
- delegate.onStop(this, now, delta)
+ delegate.onStop(this, now)
}
}
}
@@ -163,7 +159,7 @@ public class FlowForwarder(
}
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_innerCtx = null
val delegate = delegate
@@ -171,25 +167,24 @@ public class FlowForwarder(
reset()
try {
- delegate.onStop(this._ctx, now, delta)
+ delegate.onStop(this._ctx, now)
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
}
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val delegate = delegate
if (!hasDelegateStarted) {
start()
}
- _ctx.lastPull = now
- updateCounters(conn, delta)
+ updateCounters(conn, now)
return try {
- delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE
+ delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
@@ -198,10 +193,10 @@ public class FlowForwarder(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- delegate?.onConverge(this._ctx, now, delta)
- listener?.onConverge(now, delta)
+ delegate?.onConverge(this._ctx, now)
+ listener?.onConverge(now)
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
@@ -217,8 +212,10 @@ public class FlowForwarder(
val delegate = delegate ?: return
try {
- delegate.onStart(_ctx, engine.clock.millis())
+ val now = engine.clock.millis()
+ delegate.onStart(_ctx, now)
hasDelegateStarted = true
+ _lastUpdate = now
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
reset()
@@ -242,11 +239,15 @@ public class FlowForwarder(
* The requested flow rate.
*/
private var _demand: Double = 0.0
+ private var _lastUpdate = 0L
/**
* Update the flow counters for the transformer.
*/
- private fun updateCounters(ctx: FlowConnection, delta: Long) {
+ private fun updateCounters(ctx: FlowConnection, now: Long) {
+ val lastUpdate = _lastUpdate
+ _lastUpdate = now
+ val delta = now - lastUpdate
if (delta <= 0) {
return
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
index 6867bcef..af702701 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
@@ -45,20 +45,20 @@ public class FlowMapper(
source.onStart(delegate, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
val delegate = checkNotNull(_conn) { "Invariant violation" }
_conn = null
- source.onStop(delegate, now, delta)
+ source.onStop(delegate, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val delegate = checkNotNull(_conn) { "Invariant violation" }
- return source.onPull(delegate, now, delta)
+ return source.onPull(delegate, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
val delegate = _conn ?: return
- source.onConverge(delegate, now, delta)
+ source.onConverge(delegate, now)
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index e9094443..d0324ce8 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -104,34 +104,39 @@ public class FlowSink(
* [FlowConsumerLogic] of a sink.
*/
private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
+
override fun onPush(
ctx: FlowConsumerContext,
now: Long,
- delta: Long,
rate: Double
) {
- updateCounters(ctx, delta, rate, ctx.capacity)
+ updateCounters(ctx, now, rate, ctx.capacity)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- updateCounters(ctx, delta, 0.0, 0.0)
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
+ updateCounters(ctx, now, 0.0, 0.0)
_ctx = null
}
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ override fun onConverge(ctx: FlowConsumerContext, now: Long) {
+ parent?.onConverge(now)
}
/**
* The previous demand and capacity for the consumer.
*/
private val _previous = DoubleArray(2)
+ private var _previousUpdate = Long.MAX_VALUE
/**
* Update the counters of the flow consumer.
*/
- private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) {
+ private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) {
+ val previousUpdate = _previousUpdate
+ _previousUpdate = now
+ val delta = now - previousUpdate
+
val counters = counters
val previous = _previous
val demand = previous[0]
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index 3a7e52aa..a48ac18e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -42,19 +42,17 @@ public interface FlowSource {
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the source finished.
- * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
*/
- public fun onStop(conn: FlowConnection, now: Long, delta: Long) {}
+ public fun onStop(conn: FlowConnection, now: Long) {}
/**
* This method is invoked when the source is pulled.
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the pull is occurring.
- * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
* @return The duration after which the resource consumer should be pulled again.
*/
- public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
+ public fun onPull(conn: FlowConnection, now: Long): Long
/**
* This method is invoked when the flow graph has converged into a steady-state system.
@@ -64,7 +62,6 @@ public interface FlowSource {
*
* @param conn The connection between the source and consumer.
* @param now The virtual timestamp in milliseconds at which the system converged.
- * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {}
+ public fun onConverge(conn: FlowConnection, now: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 1d39d61c..bc6bae71 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.flow.internal
import mu.KotlinLogging
import org.opendc.simulator.flow.*
import java.util.*
-import kotlin.math.max
import kotlin.math.min
/**
@@ -124,14 +123,6 @@ internal class FlowConsumerContextImpl(
private var _flags: Int = 0
/**
- * The timestamp of calls to the callbacks.
- */
- private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull`
- private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush`
- private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence`
- private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence`
-
- /**
* The timers at which the context is scheduled to be interrupted.
*/
private var _timer: Long = Long.MAX_VALUE
@@ -238,15 +229,11 @@ internal class FlowConsumerContextImpl(
try {
// Pull the source if (1) `pull` is called or (2) the timer of the source has expired
newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
- val lastPull = _lastPull
- val delta = max(0, now - lastPull)
-
// Update state before calling into the outside world, so it observes a consistent state
- _lastPull = now
_flags = (flags and ConnPulled.inv()) or ConnUpdateActive
hasUpdated = true
- val duration = source.onPull(this, now, delta)
+ val duration = source.onPull(this, now)
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
@@ -266,15 +253,11 @@ internal class FlowConsumerContextImpl(
// Push to the consumer if the rate of the source has changed (after a call to `push`)
if (flags and ConnPushed != 0) {
- val lastPush = _lastPush
- val delta = max(0, now - lastPush)
-
// Update state before calling into the outside world, so it observes a consistent state
- _lastPush = now
_flags = (flags and ConnPushed.inv()) or ConnUpdateActive
hasUpdated = true
- logic.onPush(this, now, delta, _demand)
+ logic.onPush(this, now, _demand)
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
@@ -372,18 +355,12 @@ internal class FlowConsumerContextImpl(
// Call the source converge callback if it has enabled convergence
if (flags and ConnConvergeSource != 0) {
- val delta = max(0, now - _lastSourceConvergence)
- _lastSourceConvergence = now
-
- source.onConverge(this, now, delta)
+ source.onConverge(this, now)
}
// Call the consumer callback if it has enabled convergence
if (flags and ConnConvergeConsumer != 0) {
- val delta = max(0, now - _lastConsumerConvergence)
- _lastConsumerConvergence = now
-
- logic.onConverge(this, now, delta)
+ logic.onConverge(this, now)
}
} catch (cause: Throwable) {
// Invoke the finish callbacks
@@ -403,7 +380,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doStopSource(now: Long) {
try {
- source.onStop(this, now, max(0, now - _lastPull))
+ source.onStop(this, now)
doFinishConsumer(now, null)
} catch (cause: Throwable) {
doFinishConsumer(now, cause)
@@ -415,7 +392,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doFailSource(now: Long, cause: Throwable) {
try {
- source.onStop(this, now, max(0, now - _lastPull))
+ source.onStop(this, now)
} catch (e: Throwable) {
e.addSuppressed(cause)
doFinishConsumer(now, e)
@@ -427,7 +404,7 @@ internal class FlowConsumerContextImpl(
*/
private fun doFinishConsumer(now: Long, cause: Throwable?) {
try {
- logic.onFinish(this, now, max(0, now - _lastPush), cause)
+ logic.onFinish(this, now, cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
logger.error(e) { "Uncaught exception" }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 7fe0c1b7..1d7d22ef 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.ArrayDeque
-import kotlin.math.max
/**
* A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
@@ -139,16 +138,8 @@ public class ForwardingFlowMultiplexer(
override fun flushCounters(input: FlowConsumer) {}
- private var _lastConverge = Long.MAX_VALUE
-
- override fun onConverge(now: Long, delta: Long) {
- val listener = listener
- if (listener != null) {
- val lastConverge = _lastConverge
- _lastConverge = now
- val duration = max(0, now - lastConverge)
- listener.onConverge(now, duration)
- }
+ override fun onConverge(now: Long) {
+ listener?.onConverge(now)
}
/**
@@ -167,9 +158,9 @@ public class ForwardingFlowMultiplexer(
forwarder.onStart(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
forwarder.cancel()
- forwarder.onStop(conn, now, delta)
+ forwarder.onStop(conn, now)
}
override fun toString(): String = "ForwardingFlowMultiplexer.Output"
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 7ee4d326..cc831862 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -257,7 +257,7 @@ public class MaxMinFlowMultiplexer(
_lastConverge = now
_lastConvergeInput = input
- parent.onConverge(now, max(0, now - lastConverge))
+ parent.onConverge(now)
}
}
@@ -285,13 +285,11 @@ public class MaxMinFlowMultiplexer(
* This method is invoked when one of the outputs converges.
*/
fun convergeOutput(output: Output, now: Long) {
- val lastConverge = _lastConverge
val parent = parent
if (parent != null) {
_lastConverge = now
-
- parent.onConverge(now, max(0, now - lastConverge))
+ parent.onConverge(now)
}
if (!output.isActive) {
@@ -489,7 +487,7 @@ public class MaxMinFlowMultiplexer(
val previousUpdate = _previousUpdate
_previousUpdate = now
- val delta = (now - previousUpdate).coerceAtLeast(0)
+ val delta = now - previousUpdate
if (delta <= 0) {
return
}
@@ -647,7 +645,6 @@ public class MaxMinFlowMultiplexer(
override fun onPush(
ctx: FlowConsumerContext,
now: Long,
- delta: Long,
rate: Double
) {
doUpdateCounters(now)
@@ -660,7 +657,7 @@ public class MaxMinFlowMultiplexer(
scheduler.trigger(now)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
doUpdateCounters(now)
limit = 0.0
@@ -672,7 +669,7 @@ public class MaxMinFlowMultiplexer(
_ctx = null
}
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ override fun onConverge(ctx: FlowConsumerContext, now: Long) {
scheduler.convergeInput(this, now)
}
@@ -777,7 +774,7 @@ public class MaxMinFlowMultiplexer(
scheduler.registerOutput(this)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_conn = null
capacity = 0.0
isActive = false
@@ -785,7 +782,7 @@ public class MaxMinFlowMultiplexer(
scheduler.deregisterOutput(this, now)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
val capacity = capacity
if (capacity != conn.capacity) {
this.capacity = capacity
@@ -808,7 +805,7 @@ public class MaxMinFlowMultiplexer(
}
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
if (_isActivationOutput) {
scheduler.convergeOutput(this, now)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
index d9779c6a..6cfcc82c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
@@ -37,8 +37,17 @@ public class FixedFlowSource(private val amount: Double, private val utilization
}
private var remainingAmount = amount
+ private var lastPull: Long = 0L
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ lastPull = now
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ val lastPull = lastPull
+ this.lastPull = now
+ val delta = (now - lastPull).coerceAtLeast(0)
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val consumed = conn.rate * delta / 1000.0
val limit = conn.capacity * utilization
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 6dd60d95..80127fb5 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -53,21 +53,21 @@ public class FlowSourceRateAdapter(
delegate.onStart(conn, now)
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
try {
- delegate.onStop(conn, now, delta)
+ delegate.onStop(conn, now)
} finally {
rate = 0.0
}
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return delegate.onPull(conn, now, delta)
+ override fun onPull(conn: FlowConnection, now: Long): Long {
+ return delegate.onPull(conn, now)
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
try {
- delegate.onConverge(conn, now, delta)
+ delegate.onConverge(conn, now)
} finally {
rate = conn.rate
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
index ae537845..c9a52128 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -37,11 +37,11 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource
_iterator = trace.iterator()
}
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onStop(conn: FlowConnection, now: Long) {
_iterator = null
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
// Check whether the trace fragment was fully consumed, otherwise wait until we have done so
val nextTarget = _nextTarget
if (nextTarget > now) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
index fe39eb2c..e7b25554 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -36,7 +36,7 @@ class FlowConsumerContextTest {
fun testFlushWithoutCommand() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(1.0)
1000
@@ -57,7 +57,7 @@ class FlowConsumerContextTest {
fun testDoubleStart() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(0.0)
1000
@@ -82,7 +82,7 @@ class FlowConsumerContextTest {
fun testIdempotentCapacityChange() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (now == 0L) {
conn.push(1.0)
1000
@@ -99,6 +99,6 @@ class FlowConsumerContextTest {
context.start()
context.capacity = 4200.0
- verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onPull(any(), any()) }
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index 12e72b8f..8b090593 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -45,7 +45,7 @@ internal class FlowForwarderTest {
launch { source.consume(forwarder) }
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -66,7 +66,7 @@ internal class FlowForwarderTest {
forwarder.consume(object : FlowSource {
var isFirst = true
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -87,7 +87,7 @@ internal class FlowForwarderTest {
val engine = FlowEngineImpl(coroutineContext, clock)
val forwarder = FlowForwarder(engine)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -113,7 +113,7 @@ internal class FlowForwarderTest {
val forwarder = FlowForwarder(engine)
val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -122,7 +122,7 @@ internal class FlowForwarderTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 0) { consumer.onStop(any(), any()) }
}
@Test
@@ -140,7 +140,7 @@ internal class FlowForwarderTest {
forwarder.cancel()
verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any()) }
}
@Test
@@ -158,7 +158,7 @@ internal class FlowForwarderTest {
source.cancel()
verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any()) }
}
@Test
@@ -168,7 +168,7 @@ internal class FlowForwarderTest {
val source = FlowSink(engine, 2000.0)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -198,7 +198,7 @@ internal class FlowForwarderTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { source.onPull(any(), any(), any()) }
+ verify(exactly = 1) { source.onPull(any(), any()) }
}
@Test
@@ -214,11 +214,13 @@ internal class FlowForwarderTest {
yield()
- assertEquals(2.0, source.counters.actual)
- assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
- assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
- assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
- assertEquals(2000, clock.millis())
+ assertAll(
+ { assertEquals(2.0, source.counters.actual) },
+ { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } },
+ { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } },
+ { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } },
+ { assertEquals(2000, clock.millis()) }
+ )
}
@Test
@@ -246,7 +248,7 @@ internal class FlowForwarderTest {
try {
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
throw IllegalStateException("Test")
}
})
@@ -269,7 +271,7 @@ internal class FlowForwarderTest {
try {
forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
@@ -301,11 +303,11 @@ internal class FlowForwarderTest {
conn.shouldSourceConverge = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
- override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ override fun onConverge(conn: FlowConnection, now: Long) {
throw IllegalStateException("Test")
}
})
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
index 70c75864..726ddbf7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -65,7 +65,7 @@ internal class FlowSinkTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 3) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 3) { consumer.onPull(any(), any()) }
}
@Test
@@ -95,7 +95,7 @@ internal class FlowSinkTest {
val provider = FlowSink(engine, capacity)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
conn.close()
return Long.MAX_VALUE
}
@@ -122,7 +122,7 @@ internal class FlowSinkTest {
resCtx = conn
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -154,7 +154,7 @@ internal class FlowSinkTest {
throw IllegalStateException("Hi")
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return Long.MAX_VALUE
}
}
@@ -173,7 +173,7 @@ internal class FlowSinkTest {
val consumer = object : FlowSource {
var isFirst = true
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)
@@ -231,7 +231,7 @@ internal class FlowSinkTest {
val provider = FlowSink(engine, capacity)
val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
}
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
index 187dacd9..ef15f711 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -112,7 +112,7 @@ internal class ForwardingFlowMultiplexerTest {
isFirst = true
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ override fun onPull(conn: FlowConnection, now: Long): Long {
return if (isFirst) {
isFirst = false
conn.push(1.0)