summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-01 22:04:35 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:41 +0200
commit081221684fb826ab5a00c1d8cc5a9886b9e2203c (patch)
tree7f2202429256b4cb96812f96b682a021f8236180 /opendc-simulator/opendc-simulator-flow/src
parent6e424e9b44687d01e618e7bc38afc427610cd845 (diff)
feat(simulator): Expose CPU time counters directly on hypervisor
This change adds a new interface to the SimHypervisor interface that exposes the CPU time counters directly. These are derived from the flow counters and will be used by SimHost to expose them via telemetry.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt20
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt6
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt114
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt164
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt)10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt12
11 files changed, 270 insertions, 119 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index 4834f10f..e927f81d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -83,8 +83,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -96,8 +96,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(3) {
launch {
@@ -113,8 +113,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -126,8 +126,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(2) {
launch {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
index c8092082..b02426e3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
/**
* The previous demand for the consumer.
*/
- private var previousDemand = 0.0
+ private var _previousDemand = 0.0
+ private var _previousCapacity = 0.0
/**
* Update the counters of the flow consumer.
*/
protected fun updateCounters(ctx: FlowConnection, delta: Long) {
- val demand = previousDemand
- previousDemand = ctx.demand
+ val demand = _previousDemand
+ val capacity = _previousCapacity
+
+ _previousDemand = ctx.demand
+ _previousCapacity = ctx.capacity
if (delta <= 0) {
return
@@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
val counters = _counters
val deltaS = delta / 1000.0
- val work = demand * deltaS
+ val total = demand * deltaS
+ val work = capacity * deltaS
val actualWork = ctx.rate * deltaS
- val remainingWork = work - actualWork
counters.demand += work
counters.actual += actualWork
- counters.overcommit += remainingWork
+ counters.remaining += (total - actualWork)
}
/**
* Update the counters of the flow consumer.
*/
- protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
+ protected fun updateCounters(demand: Double, actual: Double, remaining: Double) {
val counters = _counters
counters.demand += demand
counters.actual += actual
- counters.overcommit += overcommit
+ counters.remaining += remaining
}
final override fun startConsumer(source: FlowSource) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
index e15d7643..a717ae6e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -37,9 +37,9 @@ public interface FlowCounters {
public val actual: Double
/**
- * The accumulated flow that could not be transferred over the connection.
+ * The amount of capacity that was not utilized.
*/
- public val overcommit: Double
+ public val remaining: Double
/**
* The accumulated flow lost due to interference between sources.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 17de601a..7eaaf6c2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
val counters = _counters
val deltaS = delta / 1000.0
+ val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
counters.demand += work
counters.actual += actualWork
- counters.overcommit += (work - actualWork)
+ counters.remaining += (total - actualWork)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
index 141d335d..d2fa5228 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
@@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters
internal class FlowCountersImpl : FlowCounters {
override var demand: Double = 0.0
override var actual: Double = 0.0
- override var overcommit: Double = 0.0
+ override var remaining: Double = 0.0
override var interference: Double = 0.0
override fun reset() {
demand = 0.0
actual = 0.0
- overcommit = 0.0
+ remaining = 0.0
interference = 0.0
}
override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 17b82391..04ba7f21 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -39,7 +39,22 @@ public interface FlowMultiplexer {
/**
* The outputs of the multiplexer over which the flows will be distributed.
*/
- public val outputs: Set<FlowConsumer>
+ public val outputs: Set<FlowSource>
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ public val rate: Double
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ public val demand: Double
+
+ /**
+ * The capacity of the outputs.
+ */
+ public val capacity: Double
/**
* The flow counters to track the flow metrics of all multiplexer inputs.
@@ -59,12 +74,27 @@ public interface FlowMultiplexer {
public fun removeInput(input: FlowConsumer)
/**
- * Add the specified [output] to the multiplexer.
+ * Create a new output on this multiplexer.
*/
- public fun addOutput(output: FlowConsumer)
+ public fun newOutput(): FlowSource
/**
- * Clear all inputs and outputs from the switch.
+ * Remove [output] from this multiplexer.
+ */
+ public fun removeOutput(output: FlowSource)
+
+ /**
+ * Clear all inputs and outputs from the multiplexer.
*/
public fun clear()
+
+ /**
+ * Clear the inputs of the multiplexer.
+ */
+ public fun clearInputs()
+
+ /**
+ * Clear the outputs of the multiplexer.
+ */
+ public fun clearOutputs()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 6dd9dcfb..125d10fe 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
get() = _inputs
private val _inputs = mutableSetOf<Input>()
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
- private val _availableOutputs = ArrayDeque<FlowForwarder>()
+ private val _outputs = mutableSetOf<Output>()
+ private val _availableOutputs = ArrayDeque<Output>()
override val counters: FlowCounters = object : FlowCounters {
override val demand: Double
- get() = _outputs.sumOf { it.counters.demand }
+ get() = _outputs.sumOf { it.forwarder.counters.demand }
override val actual: Double
- get() = _outputs.sumOf { it.counters.actual }
- override val overcommit: Double
- get() = _outputs.sumOf { it.counters.overcommit }
+ get() = _outputs.sumOf { it.forwarder.counters.actual }
+ override val remaining: Double
+ get() = _outputs.sumOf { it.forwarder.counters.remaining }
override val interference: Double
- get() = _outputs.sumOf { it.counters.interference }
+ get() = _outputs.sumOf { it.forwarder.counters.interference }
override fun reset() {
- for (input in _outputs) {
- input.counters.reset()
+ for (output in _outputs) {
+ output.forwarder.counters.reset()
}
}
- override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
}
+ override val rate: Double
+ get() = _outputs.sumOf { it.forwarder.rate }
+
+ override val demand: Double
+ get() = _outputs.sumOf { it.forwarder.demand }
+
+ override val capacity: Double
+ get() = _outputs.sumOf { it.forwarder.capacity }
+
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
- val output = Input(forwarder)
- _inputs += output
- return output
+ val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val input = Input(output)
+ _inputs += input
+ return input
}
override fun removeInput(input: FlowConsumer) {
@@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
return
}
- (input as Input).close()
+ val output = (input as Input).output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- override fun addOutput(output: FlowConsumer) {
- if (output in outputs) {
- return
- }
-
+ override fun newOutput(): FlowSource {
val forwarder = FlowForwarder(engine)
+ val output = Output(forwarder)
_outputs += output
- _availableOutputs += forwarder
+ return output
+ }
- output.startConsumer(object : FlowSource by forwarder {
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _outputs -= output
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
- forwarder.onStop(conn, now, delta)
- }
- })
+ val forwarder = (output as Output).forwarder
+ forwarder.close()
}
- override fun clear() {
- for (input in _outputs) {
- input.cancel()
+ override fun clearInputs() {
+ for (input in _inputs) {
+ val output = input.output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- _outputs.clear()
- // Inputs are implicitly cancelled by the output forwarders
_inputs.clear()
}
+ override fun clearOutputs() {
+ for (output in _outputs) {
+ output.forwarder.cancel()
+ }
+ _outputs.clear()
+ _availableOutputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
+ }
+
/**
* An input on the multiplexer.
*/
- private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder {
- /**
- * Close the input.
- */
- fun close() {
- // We explicitly do not close the forwarder here in order to re-use it across input resources.
- _inputs -= this
- _availableOutputs += forwarder
+ private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+
+ /**
+ * An output on the multiplexer.
+ */
+ private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _availableOutputs += this
+ forwarder.onStart(conn, now)
}
- override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ forwarder.cancel()
+ forwarder.onStop(conn, now, delta)
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Output"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 7232df35..5ff0fb8d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer(
/**
* The outputs of the multiplexer.
*/
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _outputs = mutableSetOf<Output>()
private val _activeOutputs = mutableListOf<Output>()
/**
@@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer(
/**
* The actual processing rate of the multiplexer.
*/
+ public override val rate: Double
+ get() = _rate
private var _rate = 0.0
/**
* The demanded processing rate of the input.
*/
+ public override val demand: Double
+ get() = _demand
private var _demand = 0.0
/**
* The capacity of the outputs.
*/
+ public override val capacity: Double
+ get() = _capacity
private var _capacity = 0.0
/**
* Flag to indicate that the scheduler is active.
*/
private var _schedulerActive = false
+ private var _lastSchedulerCycle = Long.MAX_VALUE
+
+ /**
+ * The last convergence timestamp and the input.
+ */
+ private var _lastConverge: Long = Long.MIN_VALUE
+ private var _lastConvergeInput: Input? = null
override fun newInput(key: InterferenceKey?): FlowConsumer {
val provider = Input(_capacity, key)
@@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer(
return provider
}
- override fun addOutput(output: FlowConsumer) {
- val consumer = Output(output)
- if (_outputs.add(output)) {
- _activeOutputs.add(consumer)
- output.startConsumer(consumer)
- }
- }
-
override fun removeInput(input: FlowConsumer) {
if (!_inputs.remove(input)) {
return
@@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer(
(input as Input).close()
}
- override fun clear() {
- for (input in _activeOutputs) {
+ override fun newOutput(): FlowSource {
+ val output = Output()
+ _outputs.add(output)
+ return output
+ }
+
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+
+ // This cast should always succeed since only `Output` instances should be added to `_outputs`
+ (output as Output).cancel()
+ }
+
+ override fun clearInputs() {
+ for (input in _inputs) {
input.cancel()
}
- _activeOutputs.clear()
+ _inputs.clear()
+ }
- for (output in _activeInputs) {
+ override fun clearOutputs() {
+ for (output in _outputs) {
output.cancel()
}
- _activeInputs.clear()
+ _outputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
}
/**
@@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer(
if (_schedulerActive) {
return
}
-
+ val lastSchedulerCycle = _lastSchedulerCycle
+ val delta = max(0, now - lastSchedulerCycle)
_schedulerActive = true
+ _lastSchedulerCycle = now
+
try {
- doSchedule(now)
+ doSchedule(now, delta)
} finally {
_schedulerActive = false
}
@@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer(
/**
* Schedule the inputs over the outputs.
*/
- private fun doSchedule(now: Long) {
+ private fun doSchedule(now: Long, delta: Long) {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
+ // Update the counters of the scheduler
+ updateCounters(delta)
+
// If there is no work yet, mark the inputs as idle.
if (activeInputs.isEmpty()) {
+ _demand = 0.0
+ _rate = 0.0
return
}
@@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer(
// Remove outputs that have finished
if (!input.isActive) {
+ input.actualRate = 0.0
inputIterator.remove()
}
}
@@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer(
// Divide the available output capacity fairly over the inputs using max-min fair sharing
var remaining = activeInputs.size
- for (input in activeInputs) {
+ for (i in activeInputs.indices) {
+ val input = activeInputs[i]
val availableShare = availableCapacity / remaining--
val grantedRate = min(input.allowedRate, availableShare)
@@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer(
activeOutputs.sort()
// Divide the requests over the available capacity of the input resources fairly
- for (output in activeOutputs) {
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
val inputCapacity = output.capacity
val fraction = inputCapacity / capacity
val grantedSpeed = rate * fraction
@@ -220,6 +258,29 @@ public class MaxMinFlowMultiplexer(
}
/**
+ * The previous capacity of the multiplexer.
+ */
+ private var _previousCapacity = 0.0
+
+ /**
+ * Update the counters of the scheduler.
+ */
+ private fun updateCounters(delta: Long) {
+ val previousCapacity = _previousCapacity
+ _previousCapacity = _capacity
+
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta / 1000.0
+
+ _counters.demand += _demand * deltaS
+ _counters.actual += _rate * deltaS
+ _counters.remaining += (previousCapacity - _rate) * deltaS
+ }
+
+ /**
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private inner class Input(capacity: Double, val key: InterferenceKey?) :
@@ -253,6 +314,11 @@ public class MaxMinFlowMultiplexer(
private var _lastPull: Long = Long.MIN_VALUE
/**
+ * The interference domain this input belongs to.
+ */
+ private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain
+
+ /**
* Close the input.
*
* This method is invoked when the user removes an input from the switch.
@@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer(
check(!_isClosed) { "Cannot re-use closed input" }
_activeInputs += this
-
if (parent != null) {
ctx.shouldConsumerConverge = true
}
@@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer(
doUpdateCounters(delta)
actualRate = 0.0
- this.limit = rate
+ limit = rate
_lastPull = now
runScheduler(now)
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ val lastConverge = _lastConverge
+ val parent = parent
+
+ if (parent != null && (lastConverge < now || _lastConvergeInput == null)) {
+ _lastConverge = now
+ _lastConvergeInput = this
+
+ parent.onConverge(now, max(0, now - lastConverge))
+ }
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
@@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
_lastPull = now
+
+ // Assign a new input responsible for handling the convergence events
+ if (_lastConvergeInput == this) {
+ _lastConvergeInput = null
+ }
+
+ // Re-run scheduler to distribute new load
+ runScheduler(now)
}
/* Comparable */
@@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer(
// Compute the performance penalty due to flow interference
val perfScore = if (interferenceDomain != null) {
- val load = _rate / capacity
+ val load = _rate / _capacity
interferenceDomain.apply(key, load)
} else {
1.0
}
val deltaS = delta / 1000.0
- val work = limit * deltaS
- val actualWork = actualRate * deltaS
- val remainingWork = work - actualWork
+ val demand = limit * deltaS
+ val actual = actualRate * deltaS
+ val remaining = (capacity - actualRate) * deltaS
- updateCounters(work, actualWork, remainingWork)
+ updateCounters(demand, actual, remaining)
- val distCounters = _counters
- distCounters.demand += work
- distCounters.actual += actualWork
- distCounters.overcommit += remainingWork
- distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ _counters.interference += actual * max(0.0, 1 - perfScore)
}
}
/**
* An internal [FlowSource] implementation for multiplexer outputs.
*/
- private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> {
+ private inner class Output : FlowSource, Comparable<Output> {
/**
* The active [FlowConnection] of this source.
*/
- private var _ctx: FlowConnection? = null
+ private var _conn: FlowConnection? = null
/**
* The capacity of this output.
@@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer(
* Push the specified rate to the consumer.
*/
fun push(rate: Double) {
- _ctx?.push(rate)
+ _conn?.push(rate)
}
/**
* Cancel this output.
*/
fun cancel() {
- provider.cancel()
+ _conn?.close()
}
override fun onStart(conn: FlowConnection, now: Long) {
- assert(_ctx == null) { "Source running concurrently" }
- _ctx = conn
+ assert(_conn == null) { "Source running concurrently" }
+ _conn = conn
capacity = conn.capacity
+ _activeOutputs.add(this)
+
updateCapacity()
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _ctx = null
+ _conn = null
capacity = 0.0
+ _activeOutputs.remove(this)
+
updateCapacity()
+
+ runScheduler(now)
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
@@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer(
updateCapacity()
}
+ // Re-run scheduler to distribute new load
runScheduler(now)
return Long.MAX_VALUE
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index d548451f..12e72b8f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -217,7 +217,7 @@ internal class FlowForwarderTest {
assertEquals(2.0, source.counters.actual)
assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
- assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
assertEquals(2000, clock.millis())
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
index 3475f027..187dacd9 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource
/**
* Test suite for the [ForwardingFlowMultiplexer] class.
*/
-internal class ExclusiveFlowMultiplexerTest {
+internal class ForwardingFlowMultiplexerTest {
/**
* Test a trace workload.
*/
@@ -63,7 +63,7 @@ internal class ExclusiveFlowMultiplexerTest {
val forwarder = FlowForwarder(engine)
val adapter = FlowSourceRateAdapter(forwarder, speed::add)
source.startConsumer(adapter)
- switch.addOutput(forwarder)
+ forwarder.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -88,7 +88,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -127,7 +127,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -146,7 +146,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
switch.newInput()
assertThrows<IllegalStateException> { switch.newInput() }
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
index 9f6b8a2c..6e2cdb98 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
@@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest {
val switch = MaxMinFlowMultiplexer(scheduler)
val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { switch.addOutput(it) }
+ sources.forEach { it.startConsumer(switch.newOutput()) }
val provider = switch.newInput()
val consumer = FixedFlowSource(2000.0, 1.0)
@@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val provider = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
provider.consume(workload)
yield()
} finally {
@@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val providerA = switch.newInput()
val providerB = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
coroutineScope {
launch { providerA.consume(workloadA) }
@@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}