summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt20
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt6
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt114
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt164
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt)10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt12
11 files changed, 270 insertions, 119 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index 4834f10f..e927f81d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -83,8 +83,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -96,8 +96,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = MaxMinFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(3) {
launch {
@@ -113,8 +113,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
val provider = switch.newInput()
return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
@@ -126,8 +126,8 @@ class FlowBenchmarks {
return scope.runBlockingSimulation {
val switch = ForwardingFlowMultiplexer(engine)
- switch.addOutput(FlowSink(engine, 3000.0))
- switch.addOutput(FlowSink(engine, 3000.0))
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
repeat(2) {
launch {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
index c8092082..b02426e3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
/**
* The previous demand for the consumer.
*/
- private var previousDemand = 0.0
+ private var _previousDemand = 0.0
+ private var _previousCapacity = 0.0
/**
* Update the counters of the flow consumer.
*/
protected fun updateCounters(ctx: FlowConnection, delta: Long) {
- val demand = previousDemand
- previousDemand = ctx.demand
+ val demand = _previousDemand
+ val capacity = _previousCapacity
+
+ _previousDemand = ctx.demand
+ _previousCapacity = ctx.capacity
if (delta <= 0) {
return
@@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
val counters = _counters
val deltaS = delta / 1000.0
- val work = demand * deltaS
+ val total = demand * deltaS
+ val work = capacity * deltaS
val actualWork = ctx.rate * deltaS
- val remainingWork = work - actualWork
counters.demand += work
counters.actual += actualWork
- counters.overcommit += remainingWork
+ counters.remaining += (total - actualWork)
}
/**
* Update the counters of the flow consumer.
*/
- protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
+ protected fun updateCounters(demand: Double, actual: Double, remaining: Double) {
val counters = _counters
counters.demand += demand
counters.actual += actual
- counters.overcommit += overcommit
+ counters.remaining += remaining
}
final override fun startConsumer(source: FlowSource) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
index e15d7643..a717ae6e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -37,9 +37,9 @@ public interface FlowCounters {
public val actual: Double
/**
- * The accumulated flow that could not be transferred over the connection.
+ * The amount of capacity that was not utilized.
*/
- public val overcommit: Double
+ public val remaining: Double
/**
* The accumulated flow lost due to interference between sources.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 17de601a..7eaaf6c2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
val counters = _counters
val deltaS = delta / 1000.0
+ val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
counters.demand += work
counters.actual += actualWork
- counters.overcommit += (work - actualWork)
+ counters.remaining += (total - actualWork)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
index 141d335d..d2fa5228 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
@@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters
internal class FlowCountersImpl : FlowCounters {
override var demand: Double = 0.0
override var actual: Double = 0.0
- override var overcommit: Double = 0.0
+ override var remaining: Double = 0.0
override var interference: Double = 0.0
override fun reset() {
demand = 0.0
actual = 0.0
- overcommit = 0.0
+ remaining = 0.0
interference = 0.0
}
override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index 17b82391..04ba7f21 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -39,7 +39,22 @@ public interface FlowMultiplexer {
/**
* The outputs of the multiplexer over which the flows will be distributed.
*/
- public val outputs: Set<FlowConsumer>
+ public val outputs: Set<FlowSource>
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ public val rate: Double
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ public val demand: Double
+
+ /**
+ * The capacity of the outputs.
+ */
+ public val capacity: Double
/**
* The flow counters to track the flow metrics of all multiplexer inputs.
@@ -59,12 +74,27 @@ public interface FlowMultiplexer {
public fun removeInput(input: FlowConsumer)
/**
- * Add the specified [output] to the multiplexer.
+ * Create a new output on this multiplexer.
*/
- public fun addOutput(output: FlowConsumer)
+ public fun newOutput(): FlowSource
/**
- * Clear all inputs and outputs from the switch.
+ * Remove [output] from this multiplexer.
+ */
+ public fun removeOutput(output: FlowSource)
+
+ /**
+ * Clear all inputs and outputs from the multiplexer.
*/
public fun clear()
+
+ /**
+ * Clear the inputs of the multiplexer.
+ */
+ public fun clearInputs()
+
+ /**
+ * Clear the outputs of the multiplexer.
+ */
+ public fun clearOutputs()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 6dd9dcfb..125d10fe 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
get() = _inputs
private val _inputs = mutableSetOf<Input>()
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
- private val _availableOutputs = ArrayDeque<FlowForwarder>()
+ private val _outputs = mutableSetOf<Output>()
+ private val _availableOutputs = ArrayDeque<Output>()
override val counters: FlowCounters = object : FlowCounters {
override val demand: Double
- get() = _outputs.sumOf { it.counters.demand }
+ get() = _outputs.sumOf { it.forwarder.counters.demand }
override val actual: Double
- get() = _outputs.sumOf { it.counters.actual }
- override val overcommit: Double
- get() = _outputs.sumOf { it.counters.overcommit }
+ get() = _outputs.sumOf { it.forwarder.counters.actual }
+ override val remaining: Double
+ get() = _outputs.sumOf { it.forwarder.counters.remaining }
override val interference: Double
- get() = _outputs.sumOf { it.counters.interference }
+ get() = _outputs.sumOf { it.forwarder.counters.interference }
override fun reset() {
- for (input in _outputs) {
- input.counters.reset()
+ for (output in _outputs) {
+ output.forwarder.counters.reset()
}
}
- override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
}
+ override val rate: Double
+ get() = _outputs.sumOf { it.forwarder.rate }
+
+ override val demand: Double
+ get() = _outputs.sumOf { it.forwarder.demand }
+
+ override val capacity: Double
+ get() = _outputs.sumOf { it.forwarder.capacity }
+
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
- val output = Input(forwarder)
- _inputs += output
- return output
+ val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val input = Input(output)
+ _inputs += input
+ return input
}
override fun removeInput(input: FlowConsumer) {
@@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
return
}
- (input as Input).close()
+ val output = (input as Input).output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- override fun addOutput(output: FlowConsumer) {
- if (output in outputs) {
- return
- }
-
+ override fun newOutput(): FlowSource {
val forwarder = FlowForwarder(engine)
+ val output = Output(forwarder)
_outputs += output
- _availableOutputs += forwarder
+ return output
+ }
- output.startConsumer(object : FlowSource by forwarder {
- override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _outputs -= output
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
- forwarder.onStop(conn, now, delta)
- }
- })
+ val forwarder = (output as Output).forwarder
+ forwarder.close()
}
- override fun clear() {
- for (input in _outputs) {
- input.cancel()
+ override fun clearInputs() {
+ for (input in _inputs) {
+ val output = input.output
+ output.forwarder.cancel()
+ _availableOutputs += output
}
- _outputs.clear()
- // Inputs are implicitly cancelled by the output forwarders
_inputs.clear()
}
+ override fun clearOutputs() {
+ for (output in _outputs) {
+ output.forwarder.cancel()
+ }
+ _outputs.clear()
+ _availableOutputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
+ }
+
/**
* An input on the multiplexer.
*/
- private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder {
- /**
- * Close the input.
- */
- fun close() {
- // We explicitly do not close the forwarder here in order to re-use it across input resources.
- _inputs -= this
- _availableOutputs += forwarder
+ private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+
+ /**
+ * An output on the multiplexer.
+ */
+ private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _availableOutputs += this
+ forwarder.onStart(conn, now)
}
- override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ forwarder.cancel()
+ forwarder.onStop(conn, now, delta)
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Output"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 7232df35..5ff0fb8d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer(
/**
* The outputs of the multiplexer.
*/
- override val outputs: Set<FlowConsumer>
+ override val outputs: Set<FlowSource>
get() = _outputs
- private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _outputs = mutableSetOf<Output>()
private val _activeOutputs = mutableListOf<Output>()
/**
@@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer(
/**
* The actual processing rate of the multiplexer.
*/
+ public override val rate: Double
+ get() = _rate
private var _rate = 0.0
/**
* The demanded processing rate of the input.
*/
+ public override val demand: Double
+ get() = _demand
private var _demand = 0.0
/**
* The capacity of the outputs.
*/
+ public override val capacity: Double
+ get() = _capacity
private var _capacity = 0.0
/**
* Flag to indicate that the scheduler is active.
*/
private var _schedulerActive = false
+ private var _lastSchedulerCycle = Long.MAX_VALUE
+
+ /**
+ * The last convergence timestamp and the input.
+ */
+ private var _lastConverge: Long = Long.MIN_VALUE
+ private var _lastConvergeInput: Input? = null
override fun newInput(key: InterferenceKey?): FlowConsumer {
val provider = Input(_capacity, key)
@@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer(
return provider
}
- override fun addOutput(output: FlowConsumer) {
- val consumer = Output(output)
- if (_outputs.add(output)) {
- _activeOutputs.add(consumer)
- output.startConsumer(consumer)
- }
- }
-
override fun removeInput(input: FlowConsumer) {
if (!_inputs.remove(input)) {
return
@@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer(
(input as Input).close()
}
- override fun clear() {
- for (input in _activeOutputs) {
+ override fun newOutput(): FlowSource {
+ val output = Output()
+ _outputs.add(output)
+ return output
+ }
+
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+
+ // This cast should always succeed since only `Output` instances should be added to `_outputs`
+ (output as Output).cancel()
+ }
+
+ override fun clearInputs() {
+ for (input in _inputs) {
input.cancel()
}
- _activeOutputs.clear()
+ _inputs.clear()
+ }
- for (output in _activeInputs) {
+ override fun clearOutputs() {
+ for (output in _outputs) {
output.cancel()
}
- _activeInputs.clear()
+ _outputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
}
/**
@@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer(
if (_schedulerActive) {
return
}
-
+ val lastSchedulerCycle = _lastSchedulerCycle
+ val delta = max(0, now - lastSchedulerCycle)
_schedulerActive = true
+ _lastSchedulerCycle = now
+
try {
- doSchedule(now)
+ doSchedule(now, delta)
} finally {
_schedulerActive = false
}
@@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer(
/**
* Schedule the inputs over the outputs.
*/
- private fun doSchedule(now: Long) {
+ private fun doSchedule(now: Long, delta: Long) {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
+ // Update the counters of the scheduler
+ updateCounters(delta)
+
// If there is no work yet, mark the inputs as idle.
if (activeInputs.isEmpty()) {
+ _demand = 0.0
+ _rate = 0.0
return
}
@@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer(
// Remove outputs that have finished
if (!input.isActive) {
+ input.actualRate = 0.0
inputIterator.remove()
}
}
@@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer(
// Divide the available output capacity fairly over the inputs using max-min fair sharing
var remaining = activeInputs.size
- for (input in activeInputs) {
+ for (i in activeInputs.indices) {
+ val input = activeInputs[i]
val availableShare = availableCapacity / remaining--
val grantedRate = min(input.allowedRate, availableShare)
@@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer(
activeOutputs.sort()
// Divide the requests over the available capacity of the input resources fairly
- for (output in activeOutputs) {
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
val inputCapacity = output.capacity
val fraction = inputCapacity / capacity
val grantedSpeed = rate * fraction
@@ -220,6 +258,29 @@ public class MaxMinFlowMultiplexer(
}
/**
+ * The previous capacity of the multiplexer.
+ */
+ private var _previousCapacity = 0.0
+
+ /**
+ * Update the counters of the scheduler.
+ */
+ private fun updateCounters(delta: Long) {
+ val previousCapacity = _previousCapacity
+ _previousCapacity = _capacity
+
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta / 1000.0
+
+ _counters.demand += _demand * deltaS
+ _counters.actual += _rate * deltaS
+ _counters.remaining += (previousCapacity - _rate) * deltaS
+ }
+
+ /**
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private inner class Input(capacity: Double, val key: InterferenceKey?) :
@@ -253,6 +314,11 @@ public class MaxMinFlowMultiplexer(
private var _lastPull: Long = Long.MIN_VALUE
/**
+ * The interference domain this input belongs to.
+ */
+ private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain
+
+ /**
* Close the input.
*
* This method is invoked when the user removes an input from the switch.
@@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer(
check(!_isClosed) { "Cannot re-use closed input" }
_activeInputs += this
-
if (parent != null) {
ctx.shouldConsumerConverge = true
}
@@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer(
doUpdateCounters(delta)
actualRate = 0.0
- this.limit = rate
+ limit = rate
_lastPull = now
runScheduler(now)
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ val lastConverge = _lastConverge
+ val parent = parent
+
+ if (parent != null && (lastConverge < now || _lastConvergeInput == null)) {
+ _lastConverge = now
+ _lastConvergeInput = this
+
+ parent.onConverge(now, max(0, now - lastConverge))
+ }
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
@@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
_lastPull = now
+
+ // Assign a new input responsible for handling the convergence events
+ if (_lastConvergeInput == this) {
+ _lastConvergeInput = null
+ }
+
+ // Re-run scheduler to distribute new load
+ runScheduler(now)
}
/* Comparable */
@@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer(
// Compute the performance penalty due to flow interference
val perfScore = if (interferenceDomain != null) {
- val load = _rate / capacity
+ val load = _rate / _capacity
interferenceDomain.apply(key, load)
} else {
1.0
}
val deltaS = delta / 1000.0
- val work = limit * deltaS
- val actualWork = actualRate * deltaS
- val remainingWork = work - actualWork
+ val demand = limit * deltaS
+ val actual = actualRate * deltaS
+ val remaining = (capacity - actualRate) * deltaS
- updateCounters(work, actualWork, remainingWork)
+ updateCounters(demand, actual, remaining)
- val distCounters = _counters
- distCounters.demand += work
- distCounters.actual += actualWork
- distCounters.overcommit += remainingWork
- distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ _counters.interference += actual * max(0.0, 1 - perfScore)
}
}
/**
* An internal [FlowSource] implementation for multiplexer outputs.
*/
- private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> {
+ private inner class Output : FlowSource, Comparable<Output> {
/**
* The active [FlowConnection] of this source.
*/
- private var _ctx: FlowConnection? = null
+ private var _conn: FlowConnection? = null
/**
* The capacity of this output.
@@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer(
* Push the specified rate to the consumer.
*/
fun push(rate: Double) {
- _ctx?.push(rate)
+ _conn?.push(rate)
}
/**
* Cancel this output.
*/
fun cancel() {
- provider.cancel()
+ _conn?.close()
}
override fun onStart(conn: FlowConnection, now: Long) {
- assert(_ctx == null) { "Source running concurrently" }
- _ctx = conn
+ assert(_conn == null) { "Source running concurrently" }
+ _conn = conn
capacity = conn.capacity
+ _activeOutputs.add(this)
+
updateCapacity()
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
- _ctx = null
+ _conn = null
capacity = 0.0
+ _activeOutputs.remove(this)
+
updateCapacity()
+
+ runScheduler(now)
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
@@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer(
updateCapacity()
}
+ // Re-run scheduler to distribute new load
runScheduler(now)
return Long.MAX_VALUE
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index d548451f..12e72b8f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -217,7 +217,7 @@ internal class FlowForwarderTest {
assertEquals(2.0, source.counters.actual)
assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
- assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
assertEquals(2000, clock.millis())
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
index 3475f027..187dacd9 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource
/**
* Test suite for the [ForwardingFlowMultiplexer] class.
*/
-internal class ExclusiveFlowMultiplexerTest {
+internal class ForwardingFlowMultiplexerTest {
/**
* Test a trace workload.
*/
@@ -63,7 +63,7 @@ internal class ExclusiveFlowMultiplexerTest {
val forwarder = FlowForwarder(engine)
val adapter = FlowSourceRateAdapter(forwarder, speed::add)
source.startConsumer(adapter)
- switch.addOutput(forwarder)
+ forwarder.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -88,7 +88,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -127,7 +127,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
val provider = switch.newInput()
provider.consume(workload)
@@ -146,7 +146,7 @@ internal class ExclusiveFlowMultiplexerTest {
val switch = ForwardingFlowMultiplexer(engine)
val source = FlowSink(engine, 3200.0)
- switch.addOutput(source)
+ source.startConsumer(switch.newOutput())
switch.newInput()
assertThrows<IllegalStateException> { switch.newInput() }
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
index 9f6b8a2c..6e2cdb98 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
@@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest {
val switch = MaxMinFlowMultiplexer(scheduler)
val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { switch.addOutput(it) }
+ sources.forEach { it.startConsumer(switch.newOutput()) }
val provider = switch.newInput()
val consumer = FixedFlowSource(2000.0, 1.0)
@@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val provider = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
provider.consume(workload)
yield()
} finally {
@@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}
@@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest {
)
val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
val providerA = switch.newInput()
val providerB = switch.newInput()
try {
- switch.addOutput(FlowSink(scheduler, 3200.0))
+ sink.startConsumer(switch.newOutput())
coroutineScope {
launch { providerA.consume(workloadA) }
@@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest {
assertAll(
{ assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
{ assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
{ assertEquals(1200000, clock.millis()) }
)
}