diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
11 files changed, 270 insertions, 119 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index 4834f10f..e927f81d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -83,8 +83,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -96,8 +96,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(3) { launch { @@ -113,8 +113,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -126,8 +126,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(2) { launch { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index c8092082..b02426e3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * The previous demand for the consumer. */ - private var previousDemand = 0.0 + private var _previousDemand = 0.0 + private var _previousCapacity = 0.0 /** * Update the counters of the flow consumer. */ protected fun updateCounters(ctx: FlowConnection, delta: Long) { - val demand = previousDemand - previousDemand = ctx.demand + val demand = _previousDemand + val capacity = _previousCapacity + + _previousDemand = ctx.demand + _previousCapacity = ctx.capacity if (delta <= 0) { return @@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi val counters = _counters val deltaS = delta / 1000.0 - val work = demand * deltaS + val total = demand * deltaS + val work = capacity * deltaS val actualWork = ctx.rate * deltaS - val remainingWork = work - actualWork counters.demand += work counters.actual += actualWork - counters.overcommit += remainingWork + counters.remaining += (total - actualWork) } /** * Update the counters of the flow consumer. */ - protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters.demand += demand counters.actual += actual - counters.overcommit += overcommit + counters.remaining += remaining } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index e15d7643..a717ae6e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -37,9 +37,9 @@ public interface FlowCounters { public val actual: Double /** - * The accumulated flow that could not be transferred over the connection. + * The amount of capacity that was not utilized. */ - public val overcommit: Double + public val remaining: Double /** * The accumulated flow lost due to interference between sources. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 17de601a..7eaaf6c2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val counters = _counters val deltaS = delta / 1000.0 + val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS counters.demand += work counters.actual += actualWork - counters.overcommit += (work - actualWork) + counters.remaining += (total - actualWork) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt index 141d335d..d2fa5228 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters internal class FlowCountersImpl : FlowCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 - override var overcommit: Double = 0.0 + override var remaining: Double = 0.0 override var interference: Double = 0.0 override fun reset() { demand = 0.0 actual = 0.0 - overcommit = 0.0 + remaining = 0.0 interference = 0.0 } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 17b82391..04ba7f21 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -39,7 +39,22 @@ public interface FlowMultiplexer { /** * The outputs of the multiplexer over which the flows will be distributed. */ - public val outputs: Set<FlowConsumer> + public val outputs: Set<FlowSource> + + /** + * The actual processing rate of the multiplexer. + */ + public val rate: Double + + /** + * The demanded processing rate of the input. + */ + public val demand: Double + + /** + * The capacity of the outputs. + */ + public val capacity: Double /** * The flow counters to track the flow metrics of all multiplexer inputs. @@ -59,12 +74,27 @@ public interface FlowMultiplexer { public fun removeInput(input: FlowConsumer) /** - * Add the specified [output] to the multiplexer. + * Create a new output on this multiplexer. */ - public fun addOutput(output: FlowConsumer) + public fun newOutput(): FlowSource /** - * Clear all inputs and outputs from the switch. + * Remove [output] from this multiplexer. + */ + public fun removeOutput(output: FlowSource) + + /** + * Clear all inputs and outputs from the multiplexer. */ public fun clear() + + /** + * Clear the inputs of the multiplexer. + */ + public fun clearInputs() + + /** + * Clear the outputs of the multiplexer. + */ + public fun clearOutputs() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 6dd9dcfb..125d10fe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul get() = _inputs private val _inputs = mutableSetOf<Input>() - override val outputs: Set<FlowConsumer> + override val outputs: Set<FlowSource> get() = _outputs - private val _outputs = mutableSetOf<FlowConsumer>() - private val _availableOutputs = ArrayDeque<FlowForwarder>() + private val _outputs = mutableSetOf<Output>() + private val _availableOutputs = ArrayDeque<Output>() override val counters: FlowCounters = object : FlowCounters { override val demand: Double - get() = _outputs.sumOf { it.counters.demand } + get() = _outputs.sumOf { it.forwarder.counters.demand } override val actual: Double - get() = _outputs.sumOf { it.counters.actual } - override val overcommit: Double - get() = _outputs.sumOf { it.counters.overcommit } + get() = _outputs.sumOf { it.forwarder.counters.actual } + override val remaining: Double + get() = _outputs.sumOf { it.forwarder.counters.remaining } override val interference: Double - get() = _outputs.sumOf { it.counters.interference } + get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { - for (input in _outputs) { - input.counters.reset() + for (output in _outputs) { + output.forwarder.counters.reset() } } - override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } + override val rate: Double + get() = _outputs.sumOf { it.forwarder.rate } + + override val demand: Double + get() = _outputs.sumOf { it.forwarder.demand } + + override val capacity: Double + get() = _outputs.sumOf { it.forwarder.capacity } + override fun newInput(key: InterferenceKey?): FlowConsumer { - val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } - val output = Input(forwarder) - _inputs += output - return output + val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val input = Input(output) + _inputs += input + return input } override fun removeInput(input: FlowConsumer) { @@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return } - (input as Input).close() + val output = (input as Input).output + output.forwarder.cancel() + _availableOutputs += output } - override fun addOutput(output: FlowConsumer) { - if (output in outputs) { - return - } - + override fun newOutput(): FlowSource { val forwarder = FlowForwarder(engine) + val output = Output(forwarder) _outputs += output - _availableOutputs += forwarder + return output + } - output.startConsumer(object : FlowSource by forwarder { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _outputs -= output + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } - forwarder.onStop(conn, now, delta) - } - }) + val forwarder = (output as Output).forwarder + forwarder.close() } - override fun clear() { - for (input in _outputs) { - input.cancel() + override fun clearInputs() { + for (input in _inputs) { + val output = input.output + output.forwarder.cancel() + _availableOutputs += output } - _outputs.clear() - // Inputs are implicitly cancelled by the output forwarders _inputs.clear() } + override fun clearOutputs() { + for (output in _outputs) { + output.forwarder.cancel() + } + _outputs.clear() + _availableOutputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() + } + /** * An input on the multiplexer. */ - private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { - /** - * Close the input. - */ - fun close() { - // We explicitly do not close the forwarder here in order to re-use it across input resources. - _inputs -= this - _availableOutputs += forwarder + private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder { + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } + + /** + * An output on the multiplexer. + */ + private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder { + override fun onStart(conn: FlowConnection, now: Long) { + _availableOutputs += this + forwarder.onStart(conn, now) } - override fun toString(): String = "ForwardingFlowMultiplexer.Input" + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + forwarder.cancel() + forwarder.onStop(conn, now, delta) + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Output" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7232df35..5ff0fb8d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer( /** * The outputs of the multiplexer. */ - override val outputs: Set<FlowConsumer> + override val outputs: Set<FlowSource> get() = _outputs - private val _outputs = mutableSetOf<FlowConsumer>() + private val _outputs = mutableSetOf<Output>() private val _activeOutputs = mutableListOf<Output>() /** @@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer( /** * The actual processing rate of the multiplexer. */ + public override val rate: Double + get() = _rate private var _rate = 0.0 /** * The demanded processing rate of the input. */ + public override val demand: Double + get() = _demand private var _demand = 0.0 /** * The capacity of the outputs. */ + public override val capacity: Double + get() = _capacity private var _capacity = 0.0 /** * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null override fun newInput(key: InterferenceKey?): FlowConsumer { val provider = Input(_capacity, key) @@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer( return provider } - override fun addOutput(output: FlowConsumer) { - val consumer = Output(output) - if (_outputs.add(output)) { - _activeOutputs.add(consumer) - output.startConsumer(consumer) - } - } - override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return @@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer( (input as Input).close() } - override fun clear() { - for (input in _activeOutputs) { + override fun newOutput(): FlowSource { + val output = Output() + _outputs.add(output) + return output + } + + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } + + // This cast should always succeed since only `Output` instances should be added to `_outputs` + (output as Output).cancel() + } + + override fun clearInputs() { + for (input in _inputs) { input.cancel() } - _activeOutputs.clear() + _inputs.clear() + } - for (output in _activeInputs) { + override fun clearOutputs() { + for (output in _outputs) { output.cancel() } - _activeInputs.clear() + _outputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() } /** @@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer( if (_schedulerActive) { return } - + val lastSchedulerCycle = _lastSchedulerCycle + val delta = max(0, now - lastSchedulerCycle) _schedulerActive = true + _lastSchedulerCycle = now + try { - doSchedule(now) + doSchedule(now, delta) } finally { _schedulerActive = false } @@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer( /** * Schedule the inputs over the outputs. */ - private fun doSchedule(now: Long) { + private fun doSchedule(now: Long, delta: Long) { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + // Update the counters of the scheduler + updateCounters(delta) + // If there is no work yet, mark the inputs as idle. if (activeInputs.isEmpty()) { + _demand = 0.0 + _rate = 0.0 return } @@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer( // Remove outputs that have finished if (!input.isActive) { + input.actualRate = 0.0 inputIterator.remove() } } @@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer( // Divide the available output capacity fairly over the inputs using max-min fair sharing var remaining = activeInputs.size - for (input in activeInputs) { + for (i in activeInputs.indices) { + val input = activeInputs[i] val availableShare = availableCapacity / remaining-- val grantedRate = min(input.allowedRate, availableShare) @@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer( activeOutputs.sort() // Divide the requests over the available capacity of the input resources fairly - for (output in activeOutputs) { + for (i in activeOutputs.indices) { + val output = activeOutputs[i] val inputCapacity = output.capacity val fraction = inputCapacity / capacity val grantedSpeed = rate * fraction @@ -220,6 +258,29 @@ public class MaxMinFlowMultiplexer( } /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 + + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = _capacity + + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 + + _counters.demand += _demand * deltaS + _counters.actual += _rate * deltaS + _counters.remaining += (previousCapacity - _rate) * deltaS + } + + /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ private inner class Input(capacity: Double, val key: InterferenceKey?) : @@ -253,6 +314,11 @@ public class MaxMinFlowMultiplexer( private var _lastPull: Long = Long.MIN_VALUE /** + * The interference domain this input belongs to. + */ + private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + + /** * Close the input. * * This method is invoked when the user removes an input from the switch. @@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this - if (parent != null) { ctx.shouldConsumerConverge = true } @@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer( doUpdateCounters(delta) actualRate = 0.0 - this.limit = rate + limit = rate _lastPull = now runScheduler(now) } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + val lastConverge = _lastConverge + val parent = parent + + if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { + _lastConverge = now + _lastConvergeInput = this + + parent.onConverge(now, max(0, now - lastConverge)) + } } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 _lastPull = now + + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == this) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + runScheduler(now) } /* Comparable */ @@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / capacity + val load = _rate / _capacity interferenceDomain.apply(key, load) } else { 1.0 } val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualRate * deltaS - val remainingWork = work - actualWork + val demand = limit * deltaS + val actual = actualRate * deltaS + val remaining = (capacity - actualRate) * deltaS - updateCounters(work, actualWork, remainingWork) + updateCounters(demand, actual, remaining) - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) + _counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> { + private inner class Output : FlowSource, Comparable<Output> { /** * The active [FlowConnection] of this source. */ - private var _ctx: FlowConnection? = null + private var _conn: FlowConnection? = null /** * The capacity of this output. @@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer( * Push the specified rate to the consumer. */ fun push(rate: Double) { - _ctx?.push(rate) + _conn?.push(rate) } /** * Cancel this output. */ fun cancel() { - provider.cancel() + _conn?.close() } override fun onStart(conn: FlowConnection, now: Long) { - assert(_ctx == null) { "Source running concurrently" } - _ctx = conn + assert(_conn == null) { "Source running concurrently" } + _conn = conn capacity = conn.capacity + _activeOutputs.add(this) + updateCapacity() } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _ctx = null + _conn = null capacity = 0.0 + _activeOutputs.remove(this) + updateCapacity() + + runScheduler(now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer( updateCapacity() } + // Re-run scheduler to distribute new load runScheduler(now) return Long.MAX_VALUE } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index d548451f..12e72b8f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -217,7 +217,7 @@ internal class FlowForwarderTest { assertEquals(2.0, source.counters.actual) assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } assertEquals(2000, clock.millis()) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt index 3475f027..187dacd9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource /** * Test suite for the [ForwardingFlowMultiplexer] class. */ -internal class ExclusiveFlowMultiplexerTest { +internal class ForwardingFlowMultiplexerTest { /** * Test a trace workload. */ @@ -63,7 +63,7 @@ internal class ExclusiveFlowMultiplexerTest { val forwarder = FlowForwarder(engine) val adapter = FlowSourceRateAdapter(forwarder, speed::add) source.startConsumer(adapter) - switch.addOutput(forwarder) + forwarder.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -88,7 +88,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -127,7 +127,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -146,7 +146,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) switch.newInput() assertThrows<IllegalStateException> { switch.newInput() } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt index 9f6b8a2c..6e2cdb98 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt @@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest { val switch = MaxMinFlowMultiplexer(scheduler) val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { switch.addOutput(it) } + sources.forEach { it.startConsumer(switch.newOutput()) } val provider = switch.newInput() val consumer = FixedFlowSource(2000.0, 1.0) @@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val provider = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) provider.consume(workload) yield() } finally { @@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val providerA = switch.newInput() val providerB = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) coroutineScope { launch { providerA.consume(workloadA) } @@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } |
