summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-12-09 15:55:20 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-12-09 15:57:13 +0100
commit0d74d2f3bbaae1581bd140c8f157ef61bdf5f842 (patch)
tree8eb393b982edd91677146f33d823313799d2f482 /opendc-simulator/opendc-simulator-flow/src/main
parent4b37f82c781c3342fc41aecd30787ca3d82b8aa9 (diff)
fix(simulator): Support convergence of space shared hypervisor
This change addresses an issue with the SimSpaceSharedHypervisor implementation where it did not emit convergence events due to missing implementation. This caused issues with users of this class trying to obtain usage data, which depended on these events being emitted.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt21
2 files changed, 27 insertions, 4 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index e3bdd7ba..a5663293 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -31,9 +31,14 @@ import kotlin.math.max
* A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
*
* @param engine The [FlowEngine] the forwarder runs in.
+ * @param listener The convergence lister to use.
* @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
*/
-public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable {
+public class FlowForwarder(
+ private val engine: FlowEngine,
+ private val listener: FlowConvergenceListener? = null,
+ private val isCoupled: Boolean = false
+) : FlowSource, FlowConsumer, AutoCloseable {
/**
* The logging instance of this connection.
*/
@@ -153,7 +158,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
override fun onStart(conn: FlowConnection, now: Long) {
_innerCtx = conn
- if (_ctx.shouldSourceConverge) {
+ if (listener != null || _ctx.shouldSourceConverge) {
conn.shouldSourceConverge = true
}
}
@@ -196,6 +201,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
try {
delegate?.onConverge(this._ctx, now, delta)
+ listener?.onConverge(now, delta)
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index b68a8baa..eff111b8 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceKey
import java.util.ArrayDeque
+import kotlin.math.max
/**
* A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
@@ -32,8 +33,12 @@ import java.util.ArrayDeque
* inputs as outputs.
*
* @param engine The [FlowEngine] driving the simulation.
+ * @param listener The convergence listener of the multiplexer.
*/
-public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer {
+public class ForwardingFlowMultiplexer(
+ private val engine: FlowEngine,
+ private val listener: FlowConvergenceListener? = null
+) : FlowMultiplexer, FlowConvergenceListener {
override val inputs: Set<FlowConsumer>
get() = _inputs
private val _inputs = mutableSetOf<Input>()
@@ -91,7 +96,7 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
}
override fun newOutput(): FlowSource {
- val forwarder = FlowForwarder(engine)
+ val forwarder = FlowForwarder(engine, this)
val output = Output(forwarder)
_outputs += output
@@ -130,6 +135,18 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
clearInputs()
}
+ private var _lastConverge = Long.MAX_VALUE
+
+ override fun onConverge(now: Long, delta: Long) {
+ val listener = listener
+ if (listener != null) {
+ val lastConverge = _lastConverge
+ _lastConverge = now
+ val duration = max(0, now - lastConverge)
+ listener.onConverge(now, duration)
+ }
+ }
+
/**
* An input on the multiplexer.
*/