diff options
3 files changed, 28 insertions, 5 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt index 82f8df38..3f3bf6ad 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisor.kt @@ -37,7 +37,7 @@ public class SimSpaceSharedHypervisor( listener: FlowConvergenceListener?, scalingGovernor: ScalingGovernor?, ) : SimAbstractHypervisor(engine, listener, scalingGovernor) { - override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine) + override val mux: FlowMultiplexer = ForwardingFlowMultiplexer(engine, this) override fun canFit(model: MachineModel): Boolean { return mux.outputs.size - mux.inputs.size >= model.cpus.size diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index e3bdd7ba..a5663293 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -31,9 +31,14 @@ import kotlin.math.max * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. * * @param engine The [FlowEngine] the forwarder runs in. + * @param listener The convergence lister to use. * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. */ -public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { +public class FlowForwarder( + private val engine: FlowEngine, + private val listener: FlowConvergenceListener? = null, + private val isCoupled: Boolean = false +) : FlowSource, FlowConsumer, AutoCloseable { /** * The logging instance of this connection. */ @@ -153,7 +158,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override fun onStart(conn: FlowConnection, now: Long) { _innerCtx = conn - if (_ctx.shouldSourceConverge) { + if (listener != null || _ctx.shouldSourceConverge) { conn.shouldSourceConverge = true } } @@ -196,6 +201,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { try { delegate?.onConverge(this._ctx, now, delta) + listener?.onConverge(now, delta) } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index b68a8baa..eff111b8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque +import kotlin.math.max /** * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means @@ -32,8 +33,12 @@ import java.util.ArrayDeque * inputs as outputs. * * @param engine The [FlowEngine] driving the simulation. + * @param listener The convergence listener of the multiplexer. */ -public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer { +public class ForwardingFlowMultiplexer( + private val engine: FlowEngine, + private val listener: FlowConvergenceListener? = null +) : FlowMultiplexer, FlowConvergenceListener { override val inputs: Set<FlowConsumer> get() = _inputs private val _inputs = mutableSetOf<Input>() @@ -91,7 +96,7 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul } override fun newOutput(): FlowSource { - val forwarder = FlowForwarder(engine) + val forwarder = FlowForwarder(engine, this) val output = Output(forwarder) _outputs += output @@ -130,6 +135,18 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul clearInputs() } + private var _lastConverge = Long.MAX_VALUE + + override fun onConverge(now: Long, delta: Long) { + val listener = listener + if (listener != null) { + val lastConverge = _lastConverge + _lastConverge = now + val duration = max(0, now - lastConverge) + listener.onConverge(now, duration) + } + } + /** * An input on the multiplexer. */ |
