diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-30 16:27:45 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:41 +0200 |
| commit | 559ac2327b8aa319fb8ab4558d4f4aa3382349f4 (patch) | |
| tree | b4c7640b57485c928479fe7d855bfbee8fc18e23 /opendc-simulator/opendc-simulator-flow | |
| parent | 94783ff9d8cd81275fefd5804ac99f98e2dee3a4 (diff) | |
perf(simulator): Make convergence callback optional
This change adds two new properties for controlling whether the
convergence callbacks of the source and consumer respectively should be
invoked. This saves a lot of unnecessary calls for stages that do not
have any implementation of the `onConvergence` method.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
11 files changed, 91 insertions, 29 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index fa833961..c327e1e9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -42,6 +42,11 @@ public interface FlowConnection : AutoCloseable { public val demand: Double /** + * A flag to control whether [FlowSource.onConverge] should be invoked for this source. + */ + public var shouldSourceConverge: Boolean + + /** * Pull the source. */ public fun pull() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index 75b2d25b..15f9b93b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -34,6 +34,11 @@ public interface FlowConsumerContext : FlowConnection { public override var capacity: Double /** + * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer. + */ + public var shouldConsumerConverge: Boolean + + /** * Start the flow over the connection. */ public fun start() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index ef94ab22..50fbc9c7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -39,6 +39,9 @@ public interface FlowConsumerLogic { /** * This method is invoked when the flow graph has converged into a steady-state system. * + * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method + * will not be invoked. + * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt index db6aa69f..d1afda6f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt @@ -23,21 +23,14 @@ package org.opendc.simulator.flow /** - * A system of possible multiple sub-resources. - * - * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the - * resource provider. + * A listener interface for when a flow stage has converged into a steady-state. */ -public interface FlowSystem { - /** - * The parent system to which this system belongs or `null` if it has no parent. - */ - public val parent: FlowSystem? - +public interface FlowConvergenceListener { /** * This method is invoked when the system has converged to a steady-state. * - * @param timestamp The timestamp at which the system converged. + * @param now The timestamp at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onConverge(timestamp: Long) + public fun onConverge(now: Long, delta: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index ab5b31c2..17de601a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -52,6 +52,12 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled * The exposed [FlowConnection]. */ private val _ctx = object : FlowConnection { + override var shouldSourceConverge: Boolean = false + set(value) { + field = value + _innerCtx?.shouldSourceConverge = value + } + override val capacity: Double get() = _innerCtx?.capacity ?: 0.0 @@ -141,6 +147,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override fun onStart(conn: FlowConnection, now: Long) { _innerCtx = conn + + if (_ctx.shouldSourceConverge) { + conn.shouldSourceConverge = true + } } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index fc590177..549a338b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -32,9 +32,16 @@ package org.opendc.simulator.flow public class FlowSink( private val engine: FlowEngine, initialCapacity: Double, - private val parent: FlowSystem? = null + private val parent: FlowConvergenceListener? = null ) : AbstractFlowConsumer(engine, initialCapacity) { + override fun start(ctx: FlowConsumerContext) { + if (parent != null) { + ctx.shouldConsumerConverge = true + } + super.start(ctx) + } + override fun createLogic(): FlowConsumerLogic { return object : FlowConsumerLogic { override fun onPush( @@ -52,7 +59,7 @@ public class FlowSink( } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now) + parent?.onConverge(now, delta) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index a4f624ef..3a7e52aa 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -59,6 +59,9 @@ public interface FlowSource { /** * This method is invoked when the flow graph has converged into a steady-state system. * + * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method + * will not be invoked. + * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 55fa92df..c087a28d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -73,6 +73,12 @@ internal class FlowConsumerContextImpl( get() = _demand /** + * Flags to control the convergence of the consumer and source. + */ + override var shouldConsumerConverge: Boolean = false + override var shouldSourceConverge: Boolean = false + + /** * The clock to track simulation time. */ private val _clock = engine.clock @@ -114,7 +120,8 @@ internal class FlowConsumerContextImpl( */ private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` - private var _lastConvergence: Long = Long.MAX_VALUE // Last call to `onConvergence` + private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence` + private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence` /** * The timers at which the context is scheduled to be interrupted. @@ -199,8 +206,14 @@ internal class FlowConsumerContextImpl( * @return A flag to indicate whether the connection has already been updated before convergence. */ fun doUpdate(now: Long): Boolean { - val willConverge = _willConverge - _willConverge = true + // The connection will only converge if either the source or the consumer wants the converge callback to be + // invoked. + val shouldConverge = shouldSourceConverge || shouldConsumerConverge + var willConverge = false + if (shouldConverge) { + willConverge = _willConverge + _willConverge = true + } val oldState = _state if (oldState != State.Active) { @@ -286,19 +299,25 @@ internal class FlowConsumerContextImpl( /** * This method is invoked when the system converges into a steady state. */ - fun onConverge(timestamp: Long) { - val delta = max(0, timestamp - _lastConvergence) - _lastConvergence = timestamp + fun onConverge(now: Long) { _willConverge = false try { - if (_state == State.Active) { - source.onConverge(this, timestamp, delta) + if (_state == State.Active && shouldSourceConverge) { + val delta = max(0, now - _lastSourceConvergence) + _lastSourceConvergence = now + + source.onConverge(this, now, delta) } - logic.onConverge(this, timestamp, delta) + if (shouldConsumerConverge) { + val delta = max(0, now - _lastConsumerConvergence) + _lastConsumerConvergence = now + + logic.onConverge(this, now, delta) + } } catch (cause: Throwable) { - doFailSource(timestamp, cause) + doFailSource(now, cause) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index c7379fa9..7232df35 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -38,7 +38,7 @@ import kotlin.math.min */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - private val parent: FlowSystem? = null, + private val parent: FlowConvergenceListener? = null, private val interferenceDomain: InterferenceDomain? = null ) : FlowMultiplexer { /** @@ -269,6 +269,11 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this + + if (parent != null) { + ctx.shouldConsumerConverge = true + } + super.start(ctx) } @@ -289,7 +294,7 @@ public class MaxMinFlowMultiplexer( } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now) + parent?.onConverge(now, delta) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 0c39523f..6dd60d95 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -47,6 +47,12 @@ public class FlowSourceRateAdapter( callback(0.0) } + override fun onStart(conn: FlowConnection, now: Long) { + conn.shouldSourceConverge = true + + delegate.onStart(conn, now) + } + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { try { delegate.onStop(conn, now, delta) @@ -60,9 +66,11 @@ public class FlowSourceRateAdapter( } override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { - delegate.onConverge(conn, now, delta) - - rate = conn.rate + try { + delegate.onConverge(conn, now, delta) + } finally { + rate = conn.rate + } } override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 3690e681..d548451f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -297,6 +297,10 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + conn.shouldSourceConverge = true + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return Long.MAX_VALUE } |
