diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-12-09 15:55:20 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-12-09 15:57:13 +0100 |
| commit | 0d74d2f3bbaae1581bd140c8f157ef61bdf5f842 (patch) | |
| tree | 8eb393b982edd91677146f33d823313799d2f482 /opendc-simulator/opendc-simulator-flow/src | |
| parent | 4b37f82c781c3342fc41aecd30787ca3d82b8aa9 (diff) | |
fix(simulator): Support convergence of space shared hypervisor
This change addresses an issue with the SimSpaceSharedHypervisor
implementation where it did not emit convergence events due to missing
implementation. This caused issues with users of this class trying to
obtain usage data, which depended on these events being emitted.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
2 files changed, 27 insertions, 4 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index e3bdd7ba..a5663293 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -31,9 +31,14 @@ import kotlin.math.max * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. * * @param engine The [FlowEngine] the forwarder runs in. + * @param listener The convergence lister to use. * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. */ -public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { +public class FlowForwarder( + private val engine: FlowEngine, + private val listener: FlowConvergenceListener? = null, + private val isCoupled: Boolean = false +) : FlowSource, FlowConsumer, AutoCloseable { /** * The logging instance of this connection. */ @@ -153,7 +158,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override fun onStart(conn: FlowConnection, now: Long) { _innerCtx = conn - if (_ctx.shouldSourceConverge) { + if (listener != null || _ctx.shouldSourceConverge) { conn.shouldSourceConverge = true } } @@ -196,6 +201,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { try { delegate?.onConverge(this._ctx, now, delta) + listener?.onConverge(now, delta) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index b68a8baa..eff111b8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque +import kotlin.math.max /** * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means @@ -32,8 +33,12 @@ import java.util.ArrayDeque * inputs as outputs. * * @param engine The [FlowEngine] driving the simulation. + * @param listener The convergence listener of the multiplexer. */ -public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer { +public class ForwardingFlowMultiplexer( + private val engine: FlowEngine, + private val listener: FlowConvergenceListener? = null +) : FlowMultiplexer, FlowConvergenceListener { override val inputs: Set<FlowConsumer> get() = _inputs private val _inputs = mutableSetOf<Input>() @@ -91,7 +96,7 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul } override fun newOutput(): FlowSource { - val forwarder = FlowForwarder(engine) + val forwarder = FlowForwarder(engine, this) val output = Output(forwarder) _outputs += output @@ -130,6 +135,18 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul clearInputs() } + private var _lastConverge = Long.MAX_VALUE + + override fun onConverge(now: Long, delta: Long) { + val listener = listener + if (listener != null) { + val lastConverge = _lastConverge + _lastConverge = now + val duration = max(0, now - lastConverge) + listener.onConverge(now, duration) + } + } + /** * An input on the multiplexer. */ |
