From 081221684fb826ab5a00c1d8cc5a9886b9e2203c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 1 Oct 2021 22:04:35 +0200 Subject: feat(simulator): Expose CPU time counters directly on hypervisor This change adds a new interface to the SimHypervisor interface that exposes the CPU time counters directly. These are derived from the flow counters and will be used by SimHost to expose them via telemetry. --- .../org/opendc/simulator/flow/FlowBenchmarks.kt | 16 +- .../opendc/simulator/flow/AbstractFlowConsumer.kt | 20 ++- .../org/opendc/simulator/flow/FlowCounters.kt | 4 +- .../org/opendc/simulator/flow/FlowForwarder.kt | 3 +- .../simulator/flow/internal/FlowCountersImpl.kt | 6 +- .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 38 ++++- .../flow/mux/ForwardingFlowMultiplexer.kt | 114 ++++++++------ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 164 ++++++++++++++++----- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 2 +- .../flow/mux/ExclusiveFlowMultiplexerTest.kt | 154 ------------------- .../flow/mux/ForwardingFlowMultiplexerTest.kt | 154 +++++++++++++++++++ .../flow/mux/MaxMinFlowMultiplexerTest.kt | 12 +- 12 files changed, 419 insertions(+), 268 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index 4834f10f..e927f81d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -83,8 +83,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -96,8 +96,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(3) { launch { @@ -113,8 +113,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -126,8 +126,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(2) { launch { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index c8092082..b02426e3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * The previous demand for the consumer. */ - private var previousDemand = 0.0 + private var _previousDemand = 0.0 + private var _previousCapacity = 0.0 /** * Update the counters of the flow consumer. */ protected fun updateCounters(ctx: FlowConnection, delta: Long) { - val demand = previousDemand - previousDemand = ctx.demand + val demand = _previousDemand + val capacity = _previousCapacity + + _previousDemand = ctx.demand + _previousCapacity = ctx.capacity if (delta <= 0) { return @@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi val counters = _counters val deltaS = delta / 1000.0 - val work = demand * deltaS + val total = demand * deltaS + val work = capacity * deltaS val actualWork = ctx.rate * deltaS - val remainingWork = work - actualWork counters.demand += work counters.actual += actualWork - counters.overcommit += remainingWork + counters.remaining += (total - actualWork) } /** * Update the counters of the flow consumer. */ - protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters.demand += demand counters.actual += actual - counters.overcommit += overcommit + counters.remaining += remaining } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index e15d7643..a717ae6e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -37,9 +37,9 @@ public interface FlowCounters { public val actual: Double /** - * The accumulated flow that could not be transferred over the connection. + * The amount of capacity that was not utilized. */ - public val overcommit: Double + public val remaining: Double /** * The accumulated flow lost due to interference between sources. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 17de601a..7eaaf6c2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val counters = _counters val deltaS = delta / 1000.0 + val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS counters.demand += work counters.actual += actualWork - counters.overcommit += (work - actualWork) + counters.remaining += (total - actualWork) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt index 141d335d..d2fa5228 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters internal class FlowCountersImpl : FlowCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 - override var overcommit: Double = 0.0 + override var remaining: Double = 0.0 override var interference: Double = 0.0 override fun reset() { demand = 0.0 actual = 0.0 - overcommit = 0.0 + remaining = 0.0 interference = 0.0 } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 17b82391..04ba7f21 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -39,7 +39,22 @@ public interface FlowMultiplexer { /** * The outputs of the multiplexer over which the flows will be distributed. */ - public val outputs: Set + public val outputs: Set + + /** + * The actual processing rate of the multiplexer. + */ + public val rate: Double + + /** + * The demanded processing rate of the input. + */ + public val demand: Double + + /** + * The capacity of the outputs. + */ + public val capacity: Double /** * The flow counters to track the flow metrics of all multiplexer inputs. @@ -59,12 +74,27 @@ public interface FlowMultiplexer { public fun removeInput(input: FlowConsumer) /** - * Add the specified [output] to the multiplexer. + * Create a new output on this multiplexer. */ - public fun addOutput(output: FlowConsumer) + public fun newOutput(): FlowSource /** - * Clear all inputs and outputs from the switch. + * Remove [output] from this multiplexer. + */ + public fun removeOutput(output: FlowSource) + + /** + * Clear all inputs and outputs from the multiplexer. */ public fun clear() + + /** + * Clear the inputs of the multiplexer. + */ + public fun clearInputs() + + /** + * Clear the outputs of the multiplexer. + */ + public fun clearOutputs() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 6dd9dcfb..125d10fe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul get() = _inputs private val _inputs = mutableSetOf() - override val outputs: Set + override val outputs: Set get() = _outputs - private val _outputs = mutableSetOf() - private val _availableOutputs = ArrayDeque() + private val _outputs = mutableSetOf() + private val _availableOutputs = ArrayDeque() override val counters: FlowCounters = object : FlowCounters { override val demand: Double - get() = _outputs.sumOf { it.counters.demand } + get() = _outputs.sumOf { it.forwarder.counters.demand } override val actual: Double - get() = _outputs.sumOf { it.counters.actual } - override val overcommit: Double - get() = _outputs.sumOf { it.counters.overcommit } + get() = _outputs.sumOf { it.forwarder.counters.actual } + override val remaining: Double + get() = _outputs.sumOf { it.forwarder.counters.remaining } override val interference: Double - get() = _outputs.sumOf { it.counters.interference } + get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { - for (input in _outputs) { - input.counters.reset() + for (output in _outputs) { + output.forwarder.counters.reset() } } - override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } + override val rate: Double + get() = _outputs.sumOf { it.forwarder.rate } + + override val demand: Double + get() = _outputs.sumOf { it.forwarder.demand } + + override val capacity: Double + get() = _outputs.sumOf { it.forwarder.capacity } + override fun newInput(key: InterferenceKey?): FlowConsumer { - val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } - val output = Input(forwarder) - _inputs += output - return output + val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val input = Input(output) + _inputs += input + return input } override fun removeInput(input: FlowConsumer) { @@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return } - (input as Input).close() + val output = (input as Input).output + output.forwarder.cancel() + _availableOutputs += output } - override fun addOutput(output: FlowConsumer) { - if (output in outputs) { - return - } - + override fun newOutput(): FlowSource { val forwarder = FlowForwarder(engine) + val output = Output(forwarder) _outputs += output - _availableOutputs += forwarder + return output + } - output.startConsumer(object : FlowSource by forwarder { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _outputs -= output + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } - forwarder.onStop(conn, now, delta) - } - }) + val forwarder = (output as Output).forwarder + forwarder.close() } - override fun clear() { - for (input in _outputs) { - input.cancel() + override fun clearInputs() { + for (input in _inputs) { + val output = input.output + output.forwarder.cancel() + _availableOutputs += output } - _outputs.clear() - // Inputs are implicitly cancelled by the output forwarders _inputs.clear() } + override fun clearOutputs() { + for (output in _outputs) { + output.forwarder.cancel() + } + _outputs.clear() + _availableOutputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() + } + /** * An input on the multiplexer. */ - private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { - /** - * Close the input. - */ - fun close() { - // We explicitly do not close the forwarder here in order to re-use it across input resources. - _inputs -= this - _availableOutputs += forwarder + private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder { + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } + + /** + * An output on the multiplexer. + */ + private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder { + override fun onStart(conn: FlowConnection, now: Long) { + _availableOutputs += this + forwarder.onStart(conn, now) } - override fun toString(): String = "ForwardingFlowMultiplexer.Input" + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + forwarder.cancel() + forwarder.onStop(conn, now, delta) + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Output" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7232df35..5ff0fb8d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer( /** * The outputs of the multiplexer. */ - override val outputs: Set + override val outputs: Set get() = _outputs - private val _outputs = mutableSetOf() + private val _outputs = mutableSetOf() private val _activeOutputs = mutableListOf() /** @@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer( /** * The actual processing rate of the multiplexer. */ + public override val rate: Double + get() = _rate private var _rate = 0.0 /** * The demanded processing rate of the input. */ + public override val demand: Double + get() = _demand private var _demand = 0.0 /** * The capacity of the outputs. */ + public override val capacity: Double + get() = _capacity private var _capacity = 0.0 /** * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null override fun newInput(key: InterferenceKey?): FlowConsumer { val provider = Input(_capacity, key) @@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer( return provider } - override fun addOutput(output: FlowConsumer) { - val consumer = Output(output) - if (_outputs.add(output)) { - _activeOutputs.add(consumer) - output.startConsumer(consumer) - } - } - override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return @@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer( (input as Input).close() } - override fun clear() { - for (input in _activeOutputs) { + override fun newOutput(): FlowSource { + val output = Output() + _outputs.add(output) + return output + } + + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } + + // This cast should always succeed since only `Output` instances should be added to `_outputs` + (output as Output).cancel() + } + + override fun clearInputs() { + for (input in _inputs) { input.cancel() } - _activeOutputs.clear() + _inputs.clear() + } - for (output in _activeInputs) { + override fun clearOutputs() { + for (output in _outputs) { output.cancel() } - _activeInputs.clear() + _outputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() } /** @@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer( if (_schedulerActive) { return } - + val lastSchedulerCycle = _lastSchedulerCycle + val delta = max(0, now - lastSchedulerCycle) _schedulerActive = true + _lastSchedulerCycle = now + try { - doSchedule(now) + doSchedule(now, delta) } finally { _schedulerActive = false } @@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer( /** * Schedule the inputs over the outputs. */ - private fun doSchedule(now: Long) { + private fun doSchedule(now: Long, delta: Long) { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + // Update the counters of the scheduler + updateCounters(delta) + // If there is no work yet, mark the inputs as idle. if (activeInputs.isEmpty()) { + _demand = 0.0 + _rate = 0.0 return } @@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer( // Remove outputs that have finished if (!input.isActive) { + input.actualRate = 0.0 inputIterator.remove() } } @@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer( // Divide the available output capacity fairly over the inputs using max-min fair sharing var remaining = activeInputs.size - for (input in activeInputs) { + for (i in activeInputs.indices) { + val input = activeInputs[i] val availableShare = availableCapacity / remaining-- val grantedRate = min(input.allowedRate, availableShare) @@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer( activeOutputs.sort() // Divide the requests over the available capacity of the input resources fairly - for (output in activeOutputs) { + for (i in activeOutputs.indices) { + val output = activeOutputs[i] val inputCapacity = output.capacity val fraction = inputCapacity / capacity val grantedSpeed = rate * fraction @@ -219,6 +257,29 @@ public class MaxMinFlowMultiplexer( } } + /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 + + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = _capacity + + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 + + _counters.demand += _demand * deltaS + _counters.actual += _rate * deltaS + _counters.remaining += (previousCapacity - _rate) * deltaS + } + /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ @@ -252,6 +313,11 @@ public class MaxMinFlowMultiplexer( */ private var _lastPull: Long = Long.MIN_VALUE + /** + * The interference domain this input belongs to. + */ + private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + /** * Close the input. * @@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this - if (parent != null) { ctx.shouldConsumerConverge = true } @@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer( doUpdateCounters(delta) actualRate = 0.0 - this.limit = rate + limit = rate _lastPull = now runScheduler(now) } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + val lastConverge = _lastConverge + val parent = parent + + if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { + _lastConverge = now + _lastConvergeInput = this + + parent.onConverge(now, max(0, now - lastConverge)) + } } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 _lastPull = now + + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == this) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + runScheduler(now) } /* Comparable */ @@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / capacity + val load = _rate / _capacity interferenceDomain.apply(key, load) } else { 1.0 } val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualRate * deltaS - val remainingWork = work - actualWork + val demand = limit * deltaS + val actual = actualRate * deltaS + val remaining = (capacity - actualRate) * deltaS - updateCounters(work, actualWork, remainingWork) + updateCounters(demand, actual, remaining) - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) + _counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable { + private inner class Output : FlowSource, Comparable { /** * The active [FlowConnection] of this source. */ - private var _ctx: FlowConnection? = null + private var _conn: FlowConnection? = null /** * The capacity of this output. @@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer( * Push the specified rate to the consumer. */ fun push(rate: Double) { - _ctx?.push(rate) + _conn?.push(rate) } /** * Cancel this output. */ fun cancel() { - provider.cancel() + _conn?.close() } override fun onStart(conn: FlowConnection, now: Long) { - assert(_ctx == null) { "Source running concurrently" } - _ctx = conn + assert(_conn == null) { "Source running concurrently" } + _conn = conn capacity = conn.capacity + _activeOutputs.add(this) + updateCapacity() } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _ctx = null + _conn = null capacity = 0.0 + _activeOutputs.remove(this) + updateCapacity() + + runScheduler(now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer( updateCapacity() } + // Re-run scheduler to distribute new load runScheduler(now) return Long.MAX_VALUE } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index d548451f..12e72b8f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -217,7 +217,7 @@ internal class FlowForwarderTest { assertEquals(2.0, source.counters.actual) assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } assertEquals(2000, clock.millis()) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt deleted file mode 100644 index 3475f027..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.flow.source.TraceFlowSource - -/** - * Test suite for the [ForwardingFlowMultiplexer] class. - */ -internal class ExclusiveFlowMultiplexerTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val speed = mutableListOf() - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - val forwarder = FlowForwarder(engine) - val adapter = FlowSourceRateAdapter(forwarder, speed::add) - source.startConsumer(adapter) - switch.addOutput(forwarder) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = FixedFlowSource(duration * 3.2, 1.0) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - isFirst = true - } - - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - provider.consume(workload) - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - switch.newInput() - assertThrows { switch.newInput() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt new file mode 100644 index 00000000..187dacd9 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [ForwardingFlowMultiplexer] class. + */ +internal class ForwardingFlowMultiplexerTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val speed = mutableListOf() + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) + source.startConsumer(adapter) + forwarder.startConsumer(switch.newOutput()) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = FixedFlowSource(duration * 3.2, 1.0) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + source.startConsumer(switch.newOutput()) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertEquals(duration, clock.millis()) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : FlowSource { + var isFirst = true + + override fun onStart(conn: FlowConnection, now: Long) { + isFirst = true + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + source.startConsumer(switch.newOutput()) + + val provider = switch.newInput() + provider.consume(workload) + yield() + provider.consume(workload) + assertEquals(duration * 2, clock.millis()) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + source.startConsumer(switch.newOutput()) + + switch.newInput() + assertThrows { switch.newInput() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt index 9f6b8a2c..6e2cdb98 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt @@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest { val switch = MaxMinFlowMultiplexer(scheduler) val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { switch.addOutput(it) } + sources.forEach { it.startConsumer(switch.newOutput()) } val provider = switch.newInput() val consumer = FixedFlowSource(2000.0, 1.0) @@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val provider = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) provider.consume(workload) yield() } finally { @@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val providerA = switch.newInput() val providerB = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) coroutineScope { launch { providerA.consume(workloadA) } @@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } -- cgit v1.2.3