diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 18:15:09 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-10-03 18:15:09 +0200 |
| commit | b92d0e8703014f143ff0b1fe67de09fff6f867b1 (patch) | |
| tree | 34238f56af20f0eb697f25ad5a700bab7fa4d6fb /opendc-simulator/opendc-simulator-network | |
| parent | 54bccf522e169d5cba6489291217f3307ae71094 (diff) | |
| parent | 012fe8fa9be1676b8eef0cce795738a00c4260c0 (diff) | |
merge: Migrate to flow-based simulation for low-level models
This pull request converts the `opendc-simulator-resources` module into a flow
simulator and adapts the existing low-level models (e.g., CPU, network, disk)
to this new flow simulator.
The flow simulator works differently from the uniform resource consumption
model, in that it models flow through a system of connections, as opposed to
resource consumptions. Concretely, this means that while in the uniform
resource consumption model, consumptions with the same usage are propagated to
the resources, in the flow simulator, only changes to the flow in the system
are propagated.
Overall, this leads to less updates in the system and therefore higher
performance. The benchmarks shows that the new implementation obtains more than
double the performance of the old implementation. We have focused in the new
implementation on reducing the amount of work and memory
allocations/loads/stores per updates.
* Migrate from kotlinx-benchmark to jmh-gradle (for better profiling support)
* Use longer traces for benchmarks (to prevent measuring the benchmark
overhead)
* Use direct field access for perf-sensitive code
* Combine work and deadline to duration
* Add support for pushing flow from context (to eliminate the allocation for
every `SimResourceCommand`)
* Reduce memory allocations in SimResourceInterpreter, by revamping the way
timers are allocated.
* Simplify max-min aggregator implementation (by utilizing the new push
mechanism)
* Invoke consumer callback on every invalidation (in order to propagate changes
downstream)
* Lazily push changes to resource context (by not updating the flow rate
immediately after a push, but only after an update)
* Remove onUpdate callback
* Merge distributor and aggregator into switch
* Separate push and pull flags
* Remove failure callback from FlowSource
* Create separate callbacks for remaining events
* Make convergence callback optional
* Reduce field accesses in FlowConsumerContextImpl
* Optimize hot path in SimTraceWorkload
* Expose CPU time counters directly on hypervisor
* Optimize telemetry collection
**Breaking API Changes**
* The entire `opendc-simulator-resources` module has been replaced by the
`opendc-simulator-flow` module.
* `SimHypervisor.Listener` has been removed in favour of a new interface that
exposes the performance counters of the hypervisor directly. To listen for
convergence, use `FlowConvergenceListener`.
Diffstat (limited to 'opendc-simulator/opendc-simulator-network')
6 files changed, 64 insertions, 63 deletions
diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts index eb9adcd1..f8931053 100644 --- a/opendc-simulator/opendc-simulator-network/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts @@ -30,6 +30,8 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(projects.opendcSimulator.opendcSimulatorResources) + api(projects.opendcSimulator.opendcSimulatorFlow) implementation(projects.opendcSimulator.opendcSimulatorCore) + + testImplementation(libs.slf4j.simple) } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt index 102e5625..4b66d5cf 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkPort.kt @@ -22,8 +22,8 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.SimResourceConsumer -import org.opendc.simulator.resources.SimResourceProvider +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowSource /** * A network port allows network devices to be connected to network through links. @@ -78,14 +78,14 @@ public abstract class SimNetworkPort { } /** - * Create a [SimResourceConsumer] which generates the outgoing traffic of this port. + * Create a [FlowSource] which generates the outgoing traffic of this port. */ - protected abstract fun createConsumer(): SimResourceConsumer + protected abstract fun createConsumer(): FlowSource /** - * The [SimResourceProvider] which processes the ingoing traffic of this port. + * The [FlowConsumer] which processes the ingoing traffic of this port. */ - protected abstract val provider: SimResourceProvider + protected abstract val provider: FlowConsumer override fun toString(): String = "SimNetworkPort[isConnected=$isConnected]" } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt index 5efdbed9..4b0d7bbd 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSink.kt @@ -22,22 +22,22 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* /** * A network sink which discards all received traffic and does not generate any traffic itself. */ public class SimNetworkSink( - interpreter: SimResourceInterpreter, + engine: FlowEngine, public val capacity: Double ) : SimNetworkPort() { - override fun createConsumer(): SimResourceConsumer = object : SimResourceConsumer { - override fun onNext(ctx: SimResourceContext): SimResourceCommand = SimResourceCommand.Idle() + override fun createConsumer(): FlowSource = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE override fun toString(): String = "SimNetworkSink.Consumer" } - override val provider: SimResourceProvider = SimResourceSource(capacity, interpreter) + override val provider: FlowConsumer = FlowSink(engine, capacity) override fun toString(): String = "SimNetworkSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt index 05daaa5c..6667c80c 100644 --- a/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt +++ b/opendc-simulator/opendc-simulator-network/src/main/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtual.kt @@ -22,12 +22,13 @@ package org.opendc.simulator.network -import org.opendc.simulator.resources.* +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer /** * A [SimNetworkSwitch] that can support new networking ports on demand. */ -public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimNetworkSwitch { +public class SimNetworkSwitchVirtual(private val engine: FlowEngine) : SimNetworkSwitch { /** * The ports of this switch. */ @@ -36,9 +37,9 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN private val _ports = mutableListOf<Port>() /** - * The [SimResourceSwitchMaxMin] to actually perform the switching. + * The [MaxMinFlowMultiplexer] to actually perform the switching. */ - private val switch = SimResourceSwitchMaxMin(interpreter) + private val mux = MaxMinFlowMultiplexer(engine) /** * Open a new port on the switch. @@ -58,19 +59,17 @@ public class SimNetworkSwitchVirtual(interpreter: SimResourceInterpreter) : SimN */ private var isClosed: Boolean = false - override val provider: SimResourceProvider + override val provider: FlowConsumer get() = _provider - private val _provider = switch.newOutput() + private val _provider = mux.newInput() - override fun createConsumer(): SimResourceConsumer { - val forwarder = SimResourceForwarder(isCoupled = true) - switch.addInput(forwarder) - return forwarder - } + private val _source = mux.newOutput() + + override fun createConsumer(): FlowSource = _source override fun close() { isClosed = true - _provider.close() + mux.removeInput(_provider) _ports.remove(this) } } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index b8c4b00d..14d22162 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -31,8 +31,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimNetworkSink] class. @@ -40,8 +40,8 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimNetworkSinkTest { @Test fun testInitialState() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertFalse(sink.isConnected) assertNull(sink.link) @@ -50,8 +50,8 @@ class SimNetworkSinkTest { @Test fun testDisconnectIdempotent() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertDoesNotThrow { sink.disconnect() } assertFalse(sink.isConnected) @@ -59,8 +59,8 @@ class SimNetworkSinkTest { @Test fun testConnectCircular() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) assertThrows<IllegalArgumentException> { sink.connect(sink) @@ -69,8 +69,8 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnectedTarget() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) val source = mockk<SimNetworkPort>(relaxUnitFun = true) every { source.isConnected } returns true @@ -81,9 +81,9 @@ class SimNetworkSinkTest { @Test fun testConnectAlreadyConnected() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source1 = Source(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source1 = Source(engine) val source2 = mockk<SimNetworkPort>(relaxUnitFun = true) @@ -97,9 +97,9 @@ class SimNetworkSinkTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) val consumer = source.consumer sink.connect(source) @@ -108,14 +108,14 @@ class SimNetworkSinkTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), SimResourceEvent.Start) } + verify { consumer.onStart(any(), any()) } } @Test fun testDisconnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) val consumer = source.consumer sink.connect(source) @@ -124,14 +124,14 @@ class SimNetworkSinkTest { assertFalse(sink.isConnected) assertFalse(source.isConnected) - verify { consumer.onEvent(any(), SimResourceEvent.Exit) } + verify { consumer.onStop(any(), any(), any()) } } - private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() { - val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8)) + private class Source(engine: FlowEngine) : SimNetworkPort() { + val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8)) - public override fun createConsumer(): SimResourceConsumer = consumer + public override fun createConsumer(): FlowSource = consumer - override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter) + override val provider: FlowConsumer = FlowSink(engine, 0.0) } } diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt index 3a749bfe..62e54ffb 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt @@ -28,8 +28,8 @@ import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.resources.* -import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.source.FixedFlowSource /** * Test suite for the [SimNetworkSwitchVirtual] class. @@ -37,10 +37,10 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimNetworkSwitchVirtualTest { @Test fun testConnect() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val source = spyk(Source(interpreter)) - val switch = SimNetworkSwitchVirtual(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val source = spyk(Source(engine)) + val switch = SimNetworkSwitchVirtual(engine) val consumer = source.consumer switch.newPort().connect(sink) @@ -50,14 +50,14 @@ class SimNetworkSwitchVirtualTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), SimResourceEvent.Start) } + verify { consumer.onStart(any(), any()) } } @Test fun testConnectClosedPort() = runBlockingSimulation { - val interpreter = SimResourceInterpreter(coroutineContext, clock) - val sink = SimNetworkSink(interpreter, capacity = 100.0) - val switch = SimNetworkSwitchVirtual(interpreter) + val engine = FlowEngine(coroutineContext, clock) + val sink = SimNetworkSink(engine, capacity = 100.0) + val switch = SimNetworkSwitchVirtual(engine) val port = switch.newPort() port.close() @@ -67,11 +67,11 @@ class SimNetworkSwitchVirtualTest { } } - private class Source(interpreter: SimResourceInterpreter) : SimNetworkPort() { - val consumer = spyk(SimWorkConsumer(Double.POSITIVE_INFINITY, utilization = 0.8)) + private class Source(engine: FlowEngine) : SimNetworkPort() { + val consumer = spyk(FixedFlowSource(Double.POSITIVE_INFINITY, utilization = 0.8)) - public override fun createConsumer(): SimResourceConsumer = consumer + public override fun createConsumer(): FlowSource = consumer - override val provider: SimResourceProvider = SimResourceSource(0.0, interpreter) + override val provider: FlowConsumer = FlowSink(engine, 0.0) } } |
