From 559ac2327b8aa319fb8ab4558d4f4aa3382349f4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 16:27:45 +0200 Subject: perf(simulator): Make convergence callback optional This change adds two new properties for controlling whether the convergence callbacks of the source and consumer respectively should be invoked. This saves a lot of unnecessary calls for stages that do not have any implementation of the `onConvergence` method. --- .../opendc/simulator/compute/SimAbstractMachine.kt | 8 +++- .../simulator/compute/SimBareMetalMachine.kt | 4 +- .../org/opendc/simulator/compute/device/SimPsu.kt | 1 + .../compute/kernel/SimAbstractHypervisor.kt | 2 - .../compute/kernel/SimFairShareHypervisor.kt | 16 ++++---- .../kernel/SimFairShareHypervisorProvider.kt | 4 +- .../compute/kernel/SimHypervisorProvider.kt | 4 +- .../kernel/SimSpaceSharedHypervisorProvider.kt | 4 +- .../org/opendc/simulator/flow/FlowConnection.kt | 5 +++ .../opendc/simulator/flow/FlowConsumerContext.kt | 5 +++ .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 3 ++ .../simulator/flow/FlowConvergenceListener.kt | 36 ++++++++++++++++++ .../org/opendc/simulator/flow/FlowForwarder.kt | 10 +++++ .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 11 +++++- .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 3 ++ .../kotlin/org/opendc/simulator/flow/FlowSystem.kt | 43 ---------------------- .../flow/internal/FlowConsumerContextImpl.kt | 39 +++++++++++++++----- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 9 ++++- .../simulator/flow/source/FlowSourceRateAdapter.kt | 14 +++++-- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 4 ++ 20 files changed, 145 insertions(+), 80 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 6a62d8a5..60a10f20 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -43,9 +43,9 @@ import kotlin.coroutines.resume */ public abstract class SimAbstractMachine( protected val engine: FlowEngine, - final override val parent: FlowSystem?, + private val parent: FlowConvergenceListener?, final override val model: MachineModel -) : SimMachine, FlowSystem { +) : SimMachine, FlowConvergenceListener { /** * The resources allocated for this machine. */ @@ -116,6 +116,10 @@ public abstract class SimAbstractMachine( cancel() } + override fun onConverge(now: Long, delta: Long) { + parent?.onConverge(now, delta) + } + /** * Cancel the workload that is currently running on the machine. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 37cf282b..0bcf5957 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -46,7 +46,7 @@ public class SimBareMetalMachine( model: MachineModel, powerDriver: PowerDriver, public val psu: SimPsu = SimPsu(500.0, mapOf(1.0 to 1.0)), - parent: FlowSystem? = null, + parent: FlowConvergenceListener? = null, ) : SimAbstractMachine(engine, parent, model) { /** * The power draw of the machine onto the PSU. @@ -66,7 +66,7 @@ public class SimBareMetalMachine( */ private val powerDriverLogic = powerDriver.createLogic(this, cpus) - override fun onConverge(timestamp: Long) { + override fun onConverge(now: Long, delta: Long) { psu.update() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index 62d91c0b..09defbb5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -83,6 +83,7 @@ public class SimPsu( override fun createSource(): FlowSource = object : FlowSource { override fun onStart(conn: FlowConnection, now: Long) { _ctx = conn + conn.shouldSourceConverge = true } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index b145eefc..bcba8e8e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -145,8 +145,6 @@ public abstract class SimAbstractHypervisor( interferenceDomain?.leave(interferenceKey) } } - - override fun onConverge(timestamp: Long) {} } /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 36ab7c1c..b0515c6e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -28,8 +28,8 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSystem import org.opendc.simulator.flow.mux.FlowMultiplexer import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer @@ -45,7 +45,7 @@ import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer */ public class SimFairShareHypervisor( engine: FlowEngine, - private val parent: FlowSystem? = null, + private val parent: FlowConvergenceListener? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, private val listener: SimHypervisor.Listener? = null @@ -57,11 +57,9 @@ public class SimFairShareHypervisor( return SwitchSystem(ctx).switch } - private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowSystem { + private inner class SwitchSystem(private val ctx: SimMachineContext) : FlowConvergenceListener { val switch = MaxMinFlowMultiplexer(engine, this, interferenceDomain) - override val parent: FlowSystem? = this@SimFairShareHypervisor.parent - private var lastCpuUsage = 0.0 private var lastCpuDemand = 0.0 private var lastDemand = 0.0 @@ -70,11 +68,11 @@ public class SimFairShareHypervisor( private var lastInterference = 0.0 private var lastReport = Long.MIN_VALUE - override fun onConverge(timestamp: Long) { + override fun onConverge(now: Long, delta: Long) { val listener = listener ?: return val counters = switch.counters - if (timestamp > lastReport) { + if (now > lastReport) { listener.onSliceFinish( this@SimFairShareHypervisor, counters.demand - lastDemand, @@ -85,7 +83,7 @@ public class SimFairShareHypervisor( lastCpuDemand ) } - lastReport = timestamp + lastReport = now lastCpuDemand = switch.outputs.sumOf { it.demand } lastCpuUsage = switch.outputs.sumOf { it.rate } @@ -96,6 +94,8 @@ public class SimFairShareHypervisor( val load = lastCpuDemand / ctx.cpus.sumOf { it.model.frequency } triggerGovernors(load) + + parent?.onConverge(now, delta) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt index bfa099fb..e0a70926 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSystem /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. @@ -35,7 +35,7 @@ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, - parent: FlowSystem?, + parent: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt index 97f07097..dad2cc3b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSystem /** * A service provider interface for constructing a [SimHypervisor]. @@ -44,7 +44,7 @@ public interface SimHypervisorProvider { */ public fun create( engine: FlowEngine, - parent: FlowSystem? = null, + parent: FlowConvergenceListener? = null, scalingGovernor: ScalingGovernor? = null, interferenceDomain: VmInterferenceDomain? = null, listener: SimHypervisor.Listener? = null diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt index 7869d72d..93921eb9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorProvider.kt @@ -24,8 +24,8 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.kernel.cpufreq.ScalingGovernor import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain +import org.opendc.simulator.flow.FlowConvergenceListener import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSystem /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. @@ -35,7 +35,7 @@ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override fun create( engine: FlowEngine, - parent: FlowSystem?, + parent: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, interferenceDomain: VmInterferenceDomain?, listener: SimHypervisor.Listener? diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index fa833961..c327e1e9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -41,6 +41,11 @@ public interface FlowConnection : AutoCloseable { */ public val demand: Double + /** + * A flag to control whether [FlowSource.onConverge] should be invoked for this source. + */ + public var shouldSourceConverge: Boolean + /** * Pull the source. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index 75b2d25b..15f9b93b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -33,6 +33,11 @@ public interface FlowConsumerContext : FlowConnection { */ public override var capacity: Double + /** + * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer. + */ + public var shouldConsumerConverge: Boolean + /** * Start the flow over the connection. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index ef94ab22..50fbc9c7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -39,6 +39,9 @@ public interface FlowConsumerLogic { /** * This method is invoked when the flow graph has converged into a steady-state system. * + * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method + * will not be invoked. + * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt new file mode 100644 index 00000000..d1afda6f --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A listener interface for when a flow stage has converged into a steady-state. + */ +public interface FlowConvergenceListener { + /** + * This method is invoked when the system has converged to a steady-state. + * + * @param now The timestamp at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. + */ + public fun onConverge(now: Long, delta: Long) {} +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index ab5b31c2..17de601a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -52,6 +52,12 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled * The exposed [FlowConnection]. */ private val _ctx = object : FlowConnection { + override var shouldSourceConverge: Boolean = false + set(value) { + field = value + _innerCtx?.shouldSourceConverge = value + } + override val capacity: Double get() = _innerCtx?.capacity ?: 0.0 @@ -141,6 +147,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override fun onStart(conn: FlowConnection, now: Long) { _innerCtx = conn + + if (_ctx.shouldSourceConverge) { + conn.shouldSourceConverge = true + } } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index fc590177..549a338b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -32,9 +32,16 @@ package org.opendc.simulator.flow public class FlowSink( private val engine: FlowEngine, initialCapacity: Double, - private val parent: FlowSystem? = null + private val parent: FlowConvergenceListener? = null ) : AbstractFlowConsumer(engine, initialCapacity) { + override fun start(ctx: FlowConsumerContext) { + if (parent != null) { + ctx.shouldConsumerConverge = true + } + super.start(ctx) + } + override fun createLogic(): FlowConsumerLogic { return object : FlowConsumerLogic { override fun onPush( @@ -52,7 +59,7 @@ public class FlowSink( } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now) + parent?.onConverge(now, delta) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index a4f624ef..3a7e52aa 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -59,6 +59,9 @@ public interface FlowSource { /** * This method is invoked when the flow graph has converged into a steady-state system. * + * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method + * will not be invoked. + * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt deleted file mode 100644 index db6aa69f..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A system of possible multiple sub-resources. - * - * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the - * resource provider. - */ -public interface FlowSystem { - /** - * The parent system to which this system belongs or `null` if it has no parent. - */ - public val parent: FlowSystem? - - /** - * This method is invoked when the system has converged to a steady-state. - * - * @param timestamp The timestamp at which the system converged. - */ - public fun onConverge(timestamp: Long) -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 55fa92df..c087a28d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -72,6 +72,12 @@ internal class FlowConsumerContextImpl( override val demand: Double get() = _demand + /** + * Flags to control the convergence of the consumer and source. + */ + override var shouldConsumerConverge: Boolean = false + override var shouldSourceConverge: Boolean = false + /** * The clock to track simulation time. */ @@ -114,7 +120,8 @@ internal class FlowConsumerContextImpl( */ private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` - private var _lastConvergence: Long = Long.MAX_VALUE // Last call to `onConvergence` + private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence` + private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence` /** * The timers at which the context is scheduled to be interrupted. @@ -199,8 +206,14 @@ internal class FlowConsumerContextImpl( * @return A flag to indicate whether the connection has already been updated before convergence. */ fun doUpdate(now: Long): Boolean { - val willConverge = _willConverge - _willConverge = true + // The connection will only converge if either the source or the consumer wants the converge callback to be + // invoked. + val shouldConverge = shouldSourceConverge || shouldConsumerConverge + var willConverge = false + if (shouldConverge) { + willConverge = _willConverge + _willConverge = true + } val oldState = _state if (oldState != State.Active) { @@ -286,19 +299,25 @@ internal class FlowConsumerContextImpl( /** * This method is invoked when the system converges into a steady state. */ - fun onConverge(timestamp: Long) { - val delta = max(0, timestamp - _lastConvergence) - _lastConvergence = timestamp + fun onConverge(now: Long) { _willConverge = false try { - if (_state == State.Active) { - source.onConverge(this, timestamp, delta) + if (_state == State.Active && shouldSourceConverge) { + val delta = max(0, now - _lastSourceConvergence) + _lastSourceConvergence = now + + source.onConverge(this, now, delta) } - logic.onConverge(this, timestamp, delta) + if (shouldConsumerConverge) { + val delta = max(0, now - _lastConsumerConvergence) + _lastConsumerConvergence = now + + logic.onConverge(this, now, delta) + } } catch (cause: Throwable) { - doFailSource(timestamp, cause) + doFailSource(now, cause) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index c7379fa9..7232df35 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -38,7 +38,7 @@ import kotlin.math.min */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - private val parent: FlowSystem? = null, + private val parent: FlowConvergenceListener? = null, private val interferenceDomain: InterferenceDomain? = null ) : FlowMultiplexer { /** @@ -269,6 +269,11 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this + + if (parent != null) { + ctx.shouldConsumerConverge = true + } + super.start(ctx) } @@ -289,7 +294,7 @@ public class MaxMinFlowMultiplexer( } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now) + parent?.onConverge(now, delta) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 0c39523f..6dd60d95 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -47,6 +47,12 @@ public class FlowSourceRateAdapter( callback(0.0) } + override fun onStart(conn: FlowConnection, now: Long) { + conn.shouldSourceConverge = true + + delegate.onStart(conn, now) + } + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { try { delegate.onStop(conn, now, delta) @@ -60,9 +66,11 @@ public class FlowSourceRateAdapter( } override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { - delegate.onConverge(conn, now, delta) - - rate = conn.rate + try { + delegate.onConverge(conn, now, delta) + } finally { + rate = conn.rate + } } override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 3690e681..d548451f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -297,6 +297,10 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + conn.shouldSourceConverge = true + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return Long.MAX_VALUE } -- cgit v1.2.3