diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-01 22:04:35 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:41 +0200 |
| commit | 081221684fb826ab5a00c1d8cc5a9886b9e2203c (patch) | |
| tree | 7f2202429256b4cb96812f96b682a021f8236180 /opendc-simulator/opendc-simulator-flow | |
| parent | 6e424e9b44687d01e618e7bc38afc427610cd845 (diff) | |
feat(simulator): Expose CPU time counters directly on hypervisor
This change adds a new interface to the SimHypervisor interface that
exposes the CPU time counters directly. These are derived from the flow
counters and will be used by SimHost to expose them via telemetry.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
11 files changed, 270 insertions, 119 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index 4834f10f..e927f81d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -83,8 +83,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -96,8 +96,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(3) { launch { @@ -113,8 +113,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -126,8 +126,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(2) { launch { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index c8092082..b02426e3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * The previous demand for the consumer. */ - private var previousDemand = 0.0 + private var _previousDemand = 0.0 + private var _previousCapacity = 0.0 /** * Update the counters of the flow consumer. */ protected fun updateCounters(ctx: FlowConnection, delta: Long) { - val demand = previousDemand - previousDemand = ctx.demand + val demand = _previousDemand + val capacity = _previousCapacity + + _previousDemand = ctx.demand + _previousCapacity = ctx.capacity if (delta <= 0) { return @@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi val counters = _counters val deltaS = delta / 1000.0 - val work = demand * deltaS + val total = demand * deltaS + val work = capacity * deltaS val actualWork = ctx.rate * deltaS - val remainingWork = work - actualWork counters.demand += work counters.actual += actualWork - counters.overcommit += remainingWork + counters.remaining += (total - actualWork) } /** * Update the counters of the flow consumer. */ - protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters.demand += demand counters.actual += actual - counters.overcommit += overcommit + counters.remaining += remaining } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index e15d7643..a717ae6e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -37,9 +37,9 @@ public interface FlowCounters { public val actual: Double /** - * The accumulated flow that could not be transferred over the connection. + * The amount of capacity that was not utilized. */ - public val overcommit: Double + public val remaining: Double /** * The accumulated flow lost due to interference between sources. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 17de601a..7eaaf6c2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val counters = _counters val deltaS = delta / 1000.0 + val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS counters.demand += work counters.actual += actualWork - counters.overcommit += (work - actualWork) + counters.remaining += (total - actualWork) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt index 141d335d..d2fa5228 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters internal class FlowCountersImpl : FlowCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 - override var overcommit: Double = 0.0 + override var remaining: Double = 0.0 override var interference: Double = 0.0 override fun reset() { demand = 0.0 actual = 0.0 - overcommit = 0.0 + remaining = 0.0 interference = 0.0 } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 17b82391..04ba7f21 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -39,7 +39,22 @@ public interface FlowMultiplexer { /** * The outputs of the multiplexer over which the flows will be distributed. */ - public val outputs: Set<FlowConsumer> + public val outputs: Set<FlowSource> + + /** + * The actual processing rate of the multiplexer. + */ + public val rate: Double + + /** + * The demanded processing rate of the input. + */ + public val demand: Double + + /** + * The capacity of the outputs. + */ + public val capacity: Double /** * The flow counters to track the flow metrics of all multiplexer inputs. @@ -59,12 +74,27 @@ public interface FlowMultiplexer { public fun removeInput(input: FlowConsumer) /** - * Add the specified [output] to the multiplexer. + * Create a new output on this multiplexer. */ - public fun addOutput(output: FlowConsumer) + public fun newOutput(): FlowSource /** - * Clear all inputs and outputs from the switch. + * Remove [output] from this multiplexer. + */ + public fun removeOutput(output: FlowSource) + + /** + * Clear all inputs and outputs from the multiplexer. */ public fun clear() + + /** + * Clear the inputs of the multiplexer. + */ + public fun clearInputs() + + /** + * Clear the outputs of the multiplexer. + */ + public fun clearOutputs() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 6dd9dcfb..125d10fe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul get() = _inputs private val _inputs = mutableSetOf<Input>() - override val outputs: Set<FlowConsumer> + override val outputs: Set<FlowSource> get() = _outputs - private val _outputs = mutableSetOf<FlowConsumer>() - private val _availableOutputs = ArrayDeque<FlowForwarder>() + private val _outputs = mutableSetOf<Output>() + private val _availableOutputs = ArrayDeque<Output>() override val counters: FlowCounters = object : FlowCounters { override val demand: Double - get() = _outputs.sumOf { it.counters.demand } + get() = _outputs.sumOf { it.forwarder.counters.demand } override val actual: Double - get() = _outputs.sumOf { it.counters.actual } - override val overcommit: Double - get() = _outputs.sumOf { it.counters.overcommit } + get() = _outputs.sumOf { it.forwarder.counters.actual } + override val remaining: Double + get() = _outputs.sumOf { it.forwarder.counters.remaining } override val interference: Double - get() = _outputs.sumOf { it.counters.interference } + get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { - for (input in _outputs) { - input.counters.reset() + for (output in _outputs) { + output.forwarder.counters.reset() } } - override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } + override val rate: Double + get() = _outputs.sumOf { it.forwarder.rate } + + override val demand: Double + get() = _outputs.sumOf { it.forwarder.demand } + + override val capacity: Double + get() = _outputs.sumOf { it.forwarder.capacity } + override fun newInput(key: InterferenceKey?): FlowConsumer { - val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } - val output = Input(forwarder) - _inputs += output - return output + val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val input = Input(output) + _inputs += input + return input } override fun removeInput(input: FlowConsumer) { @@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return } - (input as Input).close() + val output = (input as Input).output + output.forwarder.cancel() + _availableOutputs += output } - override fun addOutput(output: FlowConsumer) { - if (output in outputs) { - return - } - + override fun newOutput(): FlowSource { val forwarder = FlowForwarder(engine) + val output = Output(forwarder) _outputs += output - _availableOutputs += forwarder + return output + } - output.startConsumer(object : FlowSource by forwarder { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _outputs -= output + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } - forwarder.onStop(conn, now, delta) - } - }) + val forwarder = (output as Output).forwarder + forwarder.close() } - override fun clear() { - for (input in _outputs) { - input.cancel() + override fun clearInputs() { + for (input in _inputs) { + val output = input.output + output.forwarder.cancel() + _availableOutputs += output } - _outputs.clear() - // Inputs are implicitly cancelled by the output forwarders _inputs.clear() } + override fun clearOutputs() { + for (output in _outputs) { + output.forwarder.cancel() + } + _outputs.clear() + _availableOutputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() + } + /** * An input on the multiplexer. */ - private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { - /** - * Close the input. - */ - fun close() { - // We explicitly do not close the forwarder here in order to re-use it across input resources. - _inputs -= this - _availableOutputs += forwarder + private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder { + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } + + /** + * An output on the multiplexer. + */ + private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder { + override fun onStart(conn: FlowConnection, now: Long) { + _availableOutputs += this + forwarder.onStart(conn, now) } - override fun toString(): String = "ForwardingFlowMultiplexer.Input" + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + forwarder.cancel() + forwarder.onStop(conn, now, delta) + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Output" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7232df35..5ff0fb8d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer( /** * The outputs of the multiplexer. */ - override val outputs: Set<FlowConsumer> + override val outputs: Set<FlowSource> get() = _outputs - private val _outputs = mutableSetOf<FlowConsumer>() + private val _outputs = mutableSetOf<Output>() private val _activeOutputs = mutableListOf<Output>() /** @@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer( /** * The actual processing rate of the multiplexer. */ + public override val rate: Double + get() = _rate private var _rate = 0.0 /** * The demanded processing rate of the input. */ + public override val demand: Double + get() = _demand private var _demand = 0.0 /** * The capacity of the outputs. */ + public override val capacity: Double + get() = _capacity private var _capacity = 0.0 /** * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null override fun newInput(key: InterferenceKey?): FlowConsumer { val provider = Input(_capacity, key) @@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer( return provider } - override fun addOutput(output: FlowConsumer) { - val consumer = Output(output) - if (_outputs.add(output)) { - _activeOutputs.add(consumer) - output.startConsumer(consumer) - } - } - override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return @@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer( (input as Input).close() } - override fun clear() { - for (input in _activeOutputs) { + override fun newOutput(): FlowSource { + val output = Output() + _outputs.add(output) + return output + } + + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } + + // This cast should always succeed since only `Output` instances should be added to `_outputs` + (output as Output).cancel() + } + + override fun clearInputs() { + for (input in _inputs) { input.cancel() } - _activeOutputs.clear() + _inputs.clear() + } - for (output in _activeInputs) { + override fun clearOutputs() { + for (output in _outputs) { output.cancel() } - _activeInputs.clear() + _outputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() } /** @@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer( if (_schedulerActive) { return } - + val lastSchedulerCycle = _lastSchedulerCycle + val delta = max(0, now - lastSchedulerCycle) _schedulerActive = true + _lastSchedulerCycle = now + try { - doSchedule(now) + doSchedule(now, delta) } finally { _schedulerActive = false } @@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer( /** * Schedule the inputs over the outputs. */ - private fun doSchedule(now: Long) { + private fun doSchedule(now: Long, delta: Long) { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + // Update the counters of the scheduler + updateCounters(delta) + // If there is no work yet, mark the inputs as idle. if (activeInputs.isEmpty()) { + _demand = 0.0 + _rate = 0.0 return } @@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer( // Remove outputs that have finished if (!input.isActive) { + input.actualRate = 0.0 inputIterator.remove() } } @@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer( // Divide the available output capacity fairly over the inputs using max-min fair sharing var remaining = activeInputs.size - for (input in activeInputs) { + for (i in activeInputs.indices) { + val input = activeInputs[i] val availableShare = availableCapacity / remaining-- val grantedRate = min(input.allowedRate, availableShare) @@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer( activeOutputs.sort() // Divide the requests over the available capacity of the input resources fairly - for (output in activeOutputs) { + for (i in activeOutputs.indices) { + val output = activeOutputs[i] val inputCapacity = output.capacity val fraction = inputCapacity / capacity val grantedSpeed = rate * fraction @@ -220,6 +258,29 @@ public class MaxMinFlowMultiplexer( } /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 + + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = _capacity + + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 + + _counters.demand += _demand * deltaS + _counters.actual += _rate * deltaS + _counters.remaining += (previousCapacity - _rate) * deltaS + } + + /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ private inner class Input(capacity: Double, val key: InterferenceKey?) : @@ -253,6 +314,11 @@ public class MaxMinFlowMultiplexer( private var _lastPull: Long = Long.MIN_VALUE /** + * The interference domain this input belongs to. + */ + private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + + /** * Close the input. * * This method is invoked when the user removes an input from the switch. @@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this - if (parent != null) { ctx.shouldConsumerConverge = true } @@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer( doUpdateCounters(delta) actualRate = 0.0 - this.limit = rate + limit = rate _lastPull = now runScheduler(now) } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + val lastConverge = _lastConverge + val parent = parent + + if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { + _lastConverge = now + _lastConvergeInput = this + + parent.onConverge(now, max(0, now - lastConverge)) + } } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 _lastPull = now + + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == this) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + runScheduler(now) } /* Comparable */ @@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / capacity + val load = _rate / _capacity interferenceDomain.apply(key, load) } else { 1.0 } val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualRate * deltaS - val remainingWork = work - actualWork + val demand = limit * deltaS + val actual = actualRate * deltaS + val remaining = (capacity - actualRate) * deltaS - updateCounters(work, actualWork, remainingWork) + updateCounters(demand, actual, remaining) - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) + _counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> { + private inner class Output : FlowSource, Comparable<Output> { /** * The active [FlowConnection] of this source. */ - private var _ctx: FlowConnection? = null + private var _conn: FlowConnection? = null /** * The capacity of this output. @@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer( * Push the specified rate to the consumer. */ fun push(rate: Double) { - _ctx?.push(rate) + _conn?.push(rate) } /** * Cancel this output. */ fun cancel() { - provider.cancel() + _conn?.close() } override fun onStart(conn: FlowConnection, now: Long) { - assert(_ctx == null) { "Source running concurrently" } - _ctx = conn + assert(_conn == null) { "Source running concurrently" } + _conn = conn capacity = conn.capacity + _activeOutputs.add(this) + updateCapacity() } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _ctx = null + _conn = null capacity = 0.0 + _activeOutputs.remove(this) + updateCapacity() + + runScheduler(now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer( updateCapacity() } + // Re-run scheduler to distribute new load runScheduler(now) return Long.MAX_VALUE } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index d548451f..12e72b8f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -217,7 +217,7 @@ internal class FlowForwarderTest { assertEquals(2.0, source.counters.actual) assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } assertEquals(2000, clock.millis()) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt index 3475f027..187dacd9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource /** * Test suite for the [ForwardingFlowMultiplexer] class. */ -internal class ExclusiveFlowMultiplexerTest { +internal class ForwardingFlowMultiplexerTest { /** * Test a trace workload. */ @@ -63,7 +63,7 @@ internal class ExclusiveFlowMultiplexerTest { val forwarder = FlowForwarder(engine) val adapter = FlowSourceRateAdapter(forwarder, speed::add) source.startConsumer(adapter) - switch.addOutput(forwarder) + forwarder.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -88,7 +88,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -127,7 +127,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) val provider = switch.newInput() provider.consume(workload) @@ -146,7 +146,7 @@ internal class ExclusiveFlowMultiplexerTest { val switch = ForwardingFlowMultiplexer(engine) val source = FlowSink(engine, 3200.0) - switch.addOutput(source) + source.startConsumer(switch.newOutput()) switch.newInput() assertThrows<IllegalStateException> { switch.newInput() } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt index 9f6b8a2c..6e2cdb98 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt @@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest { val switch = MaxMinFlowMultiplexer(scheduler) val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { switch.addOutput(it) } + sources.forEach { it.startConsumer(switch.newOutput()) } val provider = switch.newInput() val consumer = FixedFlowSource(2000.0, 1.0) @@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val provider = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) provider.consume(workload) yield() } finally { @@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val providerA = switch.newInput() val providerB = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) coroutineScope { launch { providerA.consume(workloadA) } @@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } |
