summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-04 16:39:28 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-21 22:13:04 +0200
commit2a457d9e5480407d76440a2277817cb735c86ae1 (patch)
tree5f7b1896f0cb825537b0772fafeaa0d6d7f79474
parent037c0600d0fe3ec699f239c41ab0e60a458484d7 (diff)
refactor(sim/flow): Remove old flow simulator
This change removes the old version of the flow simulator from the OpenDC repository. The old version has been replaced by the new flow2 framework which is able to simulate flows more efficiently.
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt137
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt72
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt62
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt57
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt35
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt48
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt264
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt75
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt160
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt67
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt44
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt436
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt119
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt218
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt200
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt124
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt60
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt177
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt811
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt66
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt52
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt67
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt107
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt331
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt245
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt158
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt150
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt57
33 files changed, 0 insertions, 4783 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
deleted file mode 100644
index 9e0a4a5e..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import kotlinx.coroutines.launch
-import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
-import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-import org.openjdk.jmh.annotations.Benchmark
-import org.openjdk.jmh.annotations.Fork
-import org.openjdk.jmh.annotations.Measurement
-import org.openjdk.jmh.annotations.Scope
-import org.openjdk.jmh.annotations.Setup
-import org.openjdk.jmh.annotations.State
-import org.openjdk.jmh.annotations.Warmup
-import java.util.concurrent.ThreadLocalRandom
-import java.util.concurrent.TimeUnit
-
-@State(Scope.Thread)
-@Fork(1)
-@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
-@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
-class FlowBenchmarks {
- private lateinit var trace: Sequence<TraceFlowSource.Fragment>
-
- @Setup
- fun setUp() {
- val random = ThreadLocalRandom.current()
- val entries = List(1000000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
- trace = entries.asSequence()
- }
-
- @Benchmark
- fun benchmarkSink() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val provider = FlowSink(engine, 4200.0)
- return@runSimulation provider.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkForward() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val provider = FlowSink(engine, 4200.0)
- val forwarder = FlowForwarder(engine)
- provider.startConsumer(forwarder)
- return@runSimulation forwarder.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkMuxMaxMinSingleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- return@runSimulation provider.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkMuxMaxMinTripleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- repeat(3) {
- launch {
- val provider = switch.newInput()
- provider.consume(TraceFlowSource(trace))
- }
- }
- }
- }
-
- @Benchmark
- fun benchmarkMuxExclusiveSingleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = ForwardingFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- return@runSimulation provider.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkMuxExclusiveTripleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = ForwardingFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- repeat(2) {
- launch {
- val provider = switch.newInput()
- provider.consume(TraceFlowSource(trace))
- }
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
deleted file mode 100644
index 8ff0bc76..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * An active connection between a [FlowSource] and [FlowConsumer].
- */
-public interface FlowConnection : AutoCloseable {
- /**
- * The capacity of the connection.
- */
- public val capacity: Double
-
- /**
- * The flow rate over the connection.
- */
- public val rate: Double
-
- /**
- * The flow demand of the source.
- */
- public val demand: Double
-
- /**
- * A flag to control whether [FlowSource.onConverge] should be invoked for this source.
- */
- public var shouldSourceConverge: Boolean
-
- /**
- * Pull the source.
- */
- public fun pull()
-
- /**
- * Pull the source.
- *
- * @param now The timestamp at which the connection is pulled.
- */
- public fun pull(now: Long)
-
- /**
- * Push the given flow [rate] over this connection.
- *
- * @param rate The rate of the flow to push.
- */
- public fun push(rate: Double)
-
- /**
- * Disconnect the consumer from its source.
- */
- public override fun close()
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
deleted file mode 100644
index a49826f4..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-
-/**
- * A consumer of a [FlowSource].
- */
-public interface FlowConsumer {
- /**
- * A flag to indicate that the consumer is currently consuming a [FlowSource].
- */
- public val isActive: Boolean
-
- /**
- * The flow capacity of this consumer.
- */
- public val capacity: Double
-
- /**
- * The current flow rate of the consumer.
- */
- public val rate: Double
-
- /**
- * The current flow demand.
- */
- public val demand: Double
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public val counters: FlowCounters
-
- /**
- * Start consuming the specified [source].
- *
- * @throws IllegalStateException if the consumer is already active.
- */
- public fun startConsumer(source: FlowSource)
-
- /**
- * Ask the consumer to pull its source.
- *
- * If the consumer is not active, this operation will be a no-op.
- */
- public fun pull()
-
- /**
- * Disconnect the consumer from its source.
- *
- * If the consumer is not active, this operation will be a no-op.
- */
- public fun cancel()
-}
-
-/**
- * Consume the specified [source] and suspend execution until the source is fully consumed or failed.
- */
-public suspend fun FlowConsumer.consume(source: FlowSource) {
- return suspendCancellableCoroutine { cont ->
- startConsumer(object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- try {
- source.onStart(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- try {
- source.onStop(conn, now)
-
- if (!cont.isCompleted) {
- cont.resume(Unit)
- }
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return try {
- source.onPull(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- source.onConverge(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun toString(): String = "SuspendingFlowSource"
- })
-
- cont.invokeOnCancellation { cancel() }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
deleted file mode 100644
index 98922ab3..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A controllable [FlowConnection].
- *
- * This interface is used by [FlowConsumer]s to control the connection between it and the source.
- */
-public interface FlowConsumerContext : FlowConnection {
- /**
- * The deadline of the source.
- */
- public val deadline: Long
-
- /**
- * The capacity of the connection.
- */
- public override var capacity: Double
-
- /**
- * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer.
- */
- public var shouldConsumerConverge: Boolean
-
- /**
- * A flag to control whether the timers for the [FlowSource] should be enabled.
- */
- public var enableTimers: Boolean
-
- /**
- * Start the flow over the connection.
- */
- public fun start()
-
- /**
- * Synchronously pull the source of the connection.
- *
- * @param now The timestamp at which the connection is pulled.
- */
- public fun pullSync(now: Long)
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
deleted file mode 100644
index 1d3adb10..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A collection of callbacks associated with a [FlowConsumer].
- */
-public interface FlowConsumerLogic {
- /**
- * This method is invoked when a [FlowSource] changes the rate of flow to this consumer.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param rate The requested processing rate of the source.
- */
- public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {}
-
- /**
- * This method is invoked when the flow graph has converged into a steady-state system.
- *
- * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method
- * will not be invoked.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the system converged.
- */
- public fun onConverge(ctx: FlowConsumerContext, now: Long) {}
-
- /**
- * This method is invoked when the [FlowSource] completed or failed.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param cause The cause of the failure or `null` if the source completed.
- */
- public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
deleted file mode 100644
index 62cb10d1..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A listener interface for when a flow stage has converged into a steady-state.
- */
-public interface FlowConvergenceListener {
- /**
- * This method is invoked when the system has converged to a steady-state.
- *
- * @param now The timestamp at which the system converged.
- */
- public fun onConverge(now: Long) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
deleted file mode 100644
index d8ad7978..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * An interface that tracks cumulative counts of the flow accumulation over a stage.
- */
-public interface FlowCounters {
- /**
- * The accumulated flow that a source wanted to push over the connection.
- */
- public val demand: Double
-
- /**
- * The accumulated flow that was actually transferred over the connection.
- */
- public val actual: Double
-
- /**
- * The amount of capacity that was not utilized.
- */
- public val remaining: Double
-
- /**
- * Reset the flow counters.
- */
- public fun reset()
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
deleted file mode 100644
index 65224827..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s.
- *
- * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
- * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
- */
-public interface FlowEngine {
- /**
- * The virtual [Clock] associated with this engine.
- */
- public val clock: Clock
-
- /**
- * Create a new [FlowConsumerContext] with the given [provider].
- *
- * @param consumer The consumer logic.
- * @param provider The logic of the resource provider.
- */
- public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext
-
- /**
- * Start batching the execution of resource updates until [popBatch] is called.
- *
- * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs
- * simultaneously) in a single state update.
- *
- * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the
- * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called
- * the same amount of times. To simplify batching, see [batch].
- */
- public fun pushBatch()
-
- /**
- * Stop the batching of resource updates and run the interpreter on the batch.
- *
- * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call.
- */
- public fun popBatch()
-
- public companion object {
- /**
- * Construct a new [FlowEngine] implementation.
- *
- * @param context The coroutine context to use.
- * @param clock The virtual simulation clock.
- */
- @JvmStatic
- @JvmName("create")
- public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine {
- return FlowEngineImpl(context, clock)
- }
- }
-}
-
-/**
- * Batch the execution of several interrupts into a single call.
- *
- * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
- */
-public inline fun FlowEngine.batch(block: () -> Unit) {
- try {
- pushBatch()
- block()
- } finally {
- popBatch()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
deleted file mode 100644
index 5202c252..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import mu.KotlinLogging
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-
-/**
- * The logging instance of this connection.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
- *
- * @param engine The [FlowEngine] the forwarder runs in.
- * @param listener The convergence lister to use.
- * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
- */
-public class FlowForwarder(
- private val engine: FlowEngine,
- private val listener: FlowConvergenceListener? = null,
- private val isCoupled: Boolean = false
-) : FlowSource, FlowConsumer, AutoCloseable {
- /**
- * The delegate [FlowSource].
- */
- private var delegate: FlowSource? = null
-
- /**
- * A flag to indicate that the delegate was started.
- */
- private var hasDelegateStarted: Boolean = false
-
- /**
- * The exposed [FlowConnection].
- */
- private val _ctx = object : FlowConnection {
- override var shouldSourceConverge: Boolean = false
- set(value) {
- field = value
- _innerCtx?.shouldSourceConverge = value
- }
-
- override val capacity: Double
- get() = _innerCtx?.capacity ?: 0.0
-
- override val demand: Double
- get() = _innerCtx?.demand ?: 0.0
-
- override val rate: Double
- get() = _innerCtx?.rate ?: 0.0
-
- override fun pull() {
- _innerCtx?.pull()
- }
-
- override fun pull(now: Long) {
- _innerCtx?.pull(now)
- }
-
- override fun push(rate: Double) {
- if (delegate == null) {
- return
- }
-
- _innerCtx?.push(rate)
- _demand = rate
- }
-
- override fun close() {
- val delegate = delegate ?: return
- val hasDelegateStarted = hasDelegateStarted
-
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
-
- if (hasDelegateStarted) {
- val now = engine.clock.millis()
- delegate.onStop(this, now)
- }
- }
- }
-
- /**
- * The [FlowConnection] in which the forwarder runs.
- */
- private var _innerCtx: FlowConnection? = null
-
- override val isActive: Boolean
- get() = delegate != null
-
- override val capacity: Double
- get() = _ctx.capacity
-
- override val rate: Double
- get() = _ctx.rate
-
- override val demand: Double
- get() = _ctx.demand
-
- override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- override fun startConsumer(source: FlowSource) {
- check(delegate == null) { "Forwarder already active" }
-
- delegate = source
-
- // Pull to replace the source
- pull()
- }
-
- override fun pull() {
- _ctx.pull()
- }
-
- override fun cancel() {
- _ctx.close()
- }
-
- override fun close() {
- val ctx = _innerCtx
-
- if (ctx != null) {
- this._innerCtx = null
- ctx.pull()
- }
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- _innerCtx = conn
-
- if (listener != null || _ctx.shouldSourceConverge) {
- conn.shouldSourceConverge = true
- }
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _innerCtx = null
-
- val delegate = delegate
- if (delegate != null) {
- reset()
-
- try {
- delegate.onStop(this._ctx, now)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
- }
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val delegate = delegate
-
- if (!hasDelegateStarted) {
- start()
- }
-
- updateCounters(conn, now)
-
- return try {
- delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
-
- reset()
- Long.MAX_VALUE
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- delegate?.onConverge(this._ctx, now)
- listener?.onConverge(now)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
-
- _innerCtx = null
- reset()
- }
- }
-
- /**
- * Start the delegate.
- */
- private fun start() {
- val delegate = delegate ?: return
-
- try {
- val now = engine.clock.millis()
- delegate.onStart(_ctx, now)
- hasDelegateStarted = true
- _lastUpdate = now
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
- reset()
- }
- }
-
- /**
- * Reset the delegate.
- */
- private fun reset() {
- if (isCoupled) {
- _innerCtx?.close()
- } else {
- _innerCtx?.push(0.0)
- }
-
- delegate = null
- hasDelegateStarted = false
- }
-
- /**
- * The requested flow rate.
- */
- private var _demand: Double = 0.0
- private var _lastUpdate = 0L
-
- /**
- * Update the flow counters for the transformer.
- */
- private fun updateCounters(ctx: FlowConnection, now: Long) {
- val lastUpdate = _lastUpdate
- _lastUpdate = now
- val delta = now - lastUpdate
- if (delta <= 0) {
- return
- }
-
- val counters = _counters
- val deltaS = delta * D_MS_TO_S
- val total = ctx.capacity * deltaS
- val work = _demand * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.increment(work, actualWork, (total - actualWork))
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
deleted file mode 100644
index af702701..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A [FlowConsumer] that maps the pushed flow through [transform].
- *
- * @param source The source of the flow.
- * @param transform The method to transform the flow.
- */
-public class FlowMapper(
- private val source: FlowSource,
- private val transform: (FlowConnection, Double) -> Double
-) : FlowSource {
-
- /**
- * The current active connection.
- */
- private var _conn: Connection? = null
-
- override fun onStart(conn: FlowConnection, now: Long) {
- check(_conn == null) { "Concurrent access" }
- val delegate = Connection(conn, transform)
- _conn = delegate
- source.onStart(delegate, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- val delegate = checkNotNull(_conn) { "Invariant violation" }
- _conn = null
- source.onStop(delegate, now)
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val delegate = checkNotNull(_conn) { "Invariant violation" }
- return source.onPull(delegate, now)
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- val delegate = _conn ?: return
- source.onConverge(delegate, now)
- }
-
- /**
- * The wrapper [FlowConnection] that is used to transform the flow.
- */
- private class Connection(
- private val delegate: FlowConnection,
- private val transform: (FlowConnection, Double) -> Double
- ) : FlowConnection by delegate {
- override fun push(rate: Double) {
- delegate.push(transform(this, rate))
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
deleted file mode 100644
index ee8cd739..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-
-/**
- * A [FlowSink] represents a sink with a fixed capacity.
- *
- * @param initialCapacity The initial capacity of the resource.
- * @param engine The engine that is used for driving the flow simulation.
- * @param parent The parent flow system.
- */
-public class FlowSink(
- private val engine: FlowEngine,
- initialCapacity: Double,
- private val parent: FlowConvergenceListener? = null
-) : FlowConsumer {
- /**
- * A flag to indicate that the flow consumer is active.
- */
- public override val isActive: Boolean
- get() = _ctx != null
-
- /**
- * The capacity of the consumer.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- _ctx?.capacity = value
- }
-
- /**
- * The current processing rate of the consumer.
- */
- public override val rate: Double
- get() = _ctx?.rate ?: 0.0
-
- /**
- * The flow processing rate demand at this instant.
- */
- public override val demand: Double
- get() = _ctx?.demand ?: 0.0
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- /**
- * The current active [FlowConsumerLogic] of this sink.
- */
- private var _ctx: FlowConsumerContext? = null
-
- override fun startConsumer(source: FlowSource) {
- check(_ctx == null) { "Consumer is in invalid state" }
-
- val ctx = engine.newContext(source, Logic(parent, _counters))
- _ctx = ctx
-
- ctx.capacity = capacity
- if (parent != null) {
- ctx.shouldConsumerConverge = true
- }
-
- ctx.start()
- }
-
- override fun pull() {
- _ctx?.pull()
- }
-
- override fun cancel() {
- _ctx?.close()
- }
-
- override fun toString(): String = "FlowSink[capacity=$capacity]"
-
- /**
- * [FlowConsumerLogic] of a sink.
- */
- private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
-
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- rate: Double
- ) {
- updateCounters(ctx, now, rate, ctx.capacity)
- }
-
- override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
- updateCounters(ctx, now, 0.0, 0.0)
-
- _ctx = null
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long) {
- parent?.onConverge(now)
- }
-
- /**
- * The previous demand and capacity for the consumer.
- */
- private val _previous = DoubleArray(2)
- private var _previousUpdate = Long.MAX_VALUE
-
- /**
- * Update the counters of the flow consumer.
- */
- private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) {
- val previousUpdate = _previousUpdate
- _previousUpdate = now
- val delta = now - previousUpdate
-
- val counters = counters
- val previous = _previous
- val demand = previous[0]
- val capacity = previous[1]
-
- previous[0] = nextDemand
- previous[1] = nextCapacity
-
- if (delta <= 0) {
- return
- }
-
- val deltaS = delta * D_MS_TO_S
- val total = demand * deltaS
- val work = capacity * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.increment(work, actualWork, (total - actualWork))
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
deleted file mode 100644
index a48ac18e..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A source of flow that is consumed by a [FlowConsumer].
- *
- * Implementations of this interface should be considered stateful and must be assumed not to be re-usable
- * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise.
- */
-public interface FlowSource {
- /**
- * This method is invoked when the source is started.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the provider finished.
- */
- public fun onStart(conn: FlowConnection, now: Long) {}
-
- /**
- * This method is invoked when the source is finished.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the source finished.
- */
- public fun onStop(conn: FlowConnection, now: Long) {}
-
- /**
- * This method is invoked when the source is pulled.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the pull is occurring.
- * @return The duration after which the resource consumer should be pulled again.
- */
- public fun onPull(conn: FlowConnection, now: Long): Long
-
- /**
- * This method is invoked when the flow graph has converged into a steady-state system.
- *
- * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method
- * will not be invoked.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the system converged.
- */
- public fun onConverge(conn: FlowConnection, now: Long) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
deleted file mode 100644
index 450195ec..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * Constant for converting milliseconds into seconds.
- */
-internal const val D_MS_TO_S = 1 / 1000.0
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
deleted file mode 100644
index 97d56fff..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * States of the flow connection.
- */
-internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source
-internal const val ConnActive = 1 // Connection is active and the source is currently being consumed
-internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore
-internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection
-
-/**
- * Flags of the flow connection
- */
-internal const val ConnPulled = 1 shl 2 // The source should be pulled
-internal const val ConnPushed = 1 shl 3 // The source has pushed a value
-internal const val ConnClose = 1 shl 4 // The connection should be closed
-internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active
-internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending
-internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
-internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
-internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
-internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
deleted file mode 100644
index fba3af5f..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import mu.KotlinLogging
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.batch
-import java.util.ArrayDeque
-import kotlin.math.min
-
-/**
- * The logging instance of this connection.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
- */
-internal class FlowConsumerContextImpl(
- private val engine: FlowEngineImpl,
- private val source: FlowSource,
- private val logic: FlowConsumerLogic
-) : FlowConsumerContext {
- /**
- * The capacity of the connection.
- */
- override var capacity: Double
- get() = _capacity
- set(value) {
- val oldValue = _capacity
-
- // Only changes will be propagated
- if (value != oldValue) {
- _capacity = value
- pull()
- }
- }
- private var _capacity: Double = 0.0
-
- /**
- * The current processing rate of the connection.
- */
- override val rate: Double
- get() = _rate
- private var _rate = 0.0
-
- /**
- * The current flow processing demand.
- */
- override val demand: Double
- get() = _demand
- private var _demand: Double = 0.0 // The current (pending) demand of the source
-
- /**
- * The deadline of the source.
- */
- override val deadline: Long
- get() = _deadline
- private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
-
- /**
- * Flags to control the convergence of the consumer and source.
- */
- override var shouldSourceConverge: Boolean
- get() = _flags and ConnConvergeSource == ConnConvergeSource
- set(value) {
- _flags =
- if (value) {
- _flags or ConnConvergeSource
- } else {
- _flags and ConnConvergeSource.inv()
- }
- }
- override var shouldConsumerConverge: Boolean
- get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer
- set(value) {
- _flags =
- if (value) {
- _flags or ConnConvergeConsumer
- } else {
- _flags and ConnConvergeConsumer.inv()
- }
- }
-
- /**
- * Flag to control the timers on the [FlowSource]
- */
- override var enableTimers: Boolean
- get() = _flags and ConnDisableTimers != ConnDisableTimers
- set(value) {
- _flags =
- if (!value) {
- _flags or ConnDisableTimers
- } else {
- _flags and ConnDisableTimers.inv()
- }
- }
-
- /**
- * The clock to track simulation time.
- */
- private val _clock = engine.clock
-
- /**
- * The flags of the flow connection, indicating certain actions.
- */
- private var _flags: Int = 0
-
- /**
- * The timers at which the context is scheduled to be interrupted.
- */
- private var _timer: Long = Long.MAX_VALUE
- private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5)
-
- override fun start() {
- check(_flags and ConnState == ConnPending) { "Consumer is already started" }
- engine.batch {
- val now = _clock.millis()
- source.onStart(this, now)
-
- // Mark the connection as active and pulled
- val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled
- scheduleImmediate(now, newFlags)
- }
- }
-
- override fun close() {
- val flags = _flags
- if (flags and ConnState == ConnClosed) {
- return
- }
-
- // Toggle the close bit. In case no update is active, schedule a new update.
- if (flags and ConnUpdateActive == 0) {
- val now = _clock.millis()
- scheduleImmediate(now, flags or ConnClose)
- } else {
- _flags = flags or ConnClose
- }
- }
-
- override fun pull(now: Long) {
- val flags = _flags
- if (flags and ConnState != ConnActive) {
- return
- }
-
- // Mark connection as pulled
- scheduleImmediate(now, flags or ConnPulled)
- }
-
- override fun pull() {
- pull(_clock.millis())
- }
-
- override fun pullSync(now: Long) {
- val flags = _flags
-
- // Do not attempt to flush the connection if the connection is closed or an update is already active
- if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) {
- return
- }
-
- if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
- engine.scheduleSync(now, this)
- }
- }
-
- override fun push(rate: Double) {
- if (_demand == rate) {
- return
- }
-
- _demand = rate
-
- val flags = _flags
-
- if (flags and ConnUpdateActive != 0) {
- // If an update is active, it will already get picked up at the end of the update
- _flags = flags or ConnPushed
- } else {
- // Invalidate only if no update is active
- scheduleImmediate(_clock.millis(), flags or ConnPushed)
- }
- }
-
- /**
- * Update the state of the flow connection.
- *
- * @param now The current virtual timestamp.
- * @param visited The queue of connections that have been visited during the cycle.
- * @param timerQueue The queue of all pending timers.
- * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update.
- */
- fun doUpdate(
- now: Long,
- visited: FlowDeque,
- timerQueue: FlowTimerQueue,
- isImmediate: Boolean
- ) {
- var flags = _flags
-
- // Precondition: The flow connection must be active
- if (flags and ConnState != ConnActive) {
- return
- }
-
- val deadline = _deadline
- val reachedDeadline = deadline == now
- var newDeadline: Long
- var hasUpdated = false
-
- try {
- // Pull the source if (1) `pull` is called or (2) the timer of the source has expired
- newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
- // Update state before calling into the outside world, so it observes a consistent state
- _flags = (flags and ConnPulled.inv()) or ConnUpdateActive
- hasUpdated = true
-
- val duration = source.onPull(this, now)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = _flags
-
- if (duration != Long.MAX_VALUE) {
- now + duration
- } else {
- duration
- }
- } else {
- deadline
- }
-
- // Make the new deadline available for the consumer if it has changed
- if (newDeadline != deadline) {
- _deadline = newDeadline
- }
-
- // Push to the consumer if the rate of the source has changed (after a call to `push`)
- if (flags and ConnPushed != 0) {
- // Update state before calling into the outside world, so it observes a consistent state
- _flags = (flags and ConnPushed.inv()) or ConnUpdateActive
- hasUpdated = true
-
- logic.onPush(this, now, _demand)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = _flags
- }
-
- // Check whether the source or consumer have tried to close the connection
- if (flags and ConnClose != 0) {
- hasUpdated = true
-
- // The source has called [FlowConnection.close], so clean up the connection
- doStopSource(now)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- // We now also mark the connection as closed
- flags = (_flags and ConnState.inv()) or ConnClosed
-
- _demand = 0.0
- newDeadline = Long.MAX_VALUE
- }
- } catch (cause: Throwable) {
- hasUpdated = true
-
- // Clean up the connection
- doFailSource(now, cause)
-
- // Mark the connection as closed
- flags = (flags and ConnState.inv()) or ConnClosed
-
- _demand = 0.0
- newDeadline = Long.MAX_VALUE
- }
-
- // Check whether the connection needs to be added to the visited queue. This is the case when:
- // (1) An update was performed (either a push or a pull)
- // (2) Either the source or consumer want to converge, and
- // (3) Convergence is not already pending (ConnConvergePending)
- if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) {
- visited.add(this)
- flags = flags or ConnConvergePending
- }
-
- // Compute the new flow rate of the connection
- // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value
- _rate = min(_capacity, _demand)
-
- // Indicate that no update is active anymore and flush the flags
- _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv()
-
- val pendingTimers = _pendingTimers
-
- // Prune the head timer if this is a delayed update
- val timer = if (!isImmediate) {
- // Invariant: Any pending timer should only point to a future timestamp
- val timer = pendingTimers.poll() ?: Long.MAX_VALUE
- _timer = timer
- timer
- } else {
- _timer
- }
-
- // Check whether we need to schedule a new timer for this connection. That is the case when:
- // (1) The deadline is valid (not the maximum value)
- // (2) The connection is active
- // (3) Timers are not disabled for the source
- // (4) The current active timer for the connection points to a later deadline
- if (newDeadline == Long.MAX_VALUE ||
- flags and ConnState != ConnActive ||
- flags and ConnDisableTimers != 0 ||
- (timer != Long.MAX_VALUE && newDeadline >= timer)
- ) {
- // Ignore any deadline scheduled at the maximum value
- // This indicates that the source does not want to register a timer
- return
- }
-
- // Construct a timer for the new deadline and add it to the global queue of timers
- _timer = newDeadline
- timerQueue.add(this, newDeadline)
-
- // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers
- if (timer != Long.MAX_VALUE) {
- pendingTimers.addFirst(timer)
- }
- }
-
- /**
- * This method is invoked when the system converges into a steady state.
- */
- fun onConverge(now: Long) {
- try {
- val flags = _flags
-
- // The connection is converging now, so unset the convergence pending flag
- _flags = flags and ConnConvergePending.inv()
-
- // Call the source converge callback if it has enabled convergence
- if (flags and ConnConvergeSource != 0) {
- source.onConverge(this, now)
- }
-
- // Call the consumer callback if it has enabled convergence
- if (flags and ConnConvergeConsumer != 0) {
- logic.onConverge(this, now)
- }
- } catch (cause: Throwable) {
- // Invoke the finish callbacks
- doFailSource(now, cause)
-
- // Mark the connection as closed
- _flags = (_flags and ConnState.inv()) or ConnClosed
- _demand = 0.0
- _deadline = Long.MAX_VALUE
- }
- }
-
- override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
-
- /**
- * Stop the [FlowSource].
- */
- private fun doStopSource(now: Long) {
- try {
- source.onStop(this, now)
- doFinishConsumer(now, null)
- } catch (cause: Throwable) {
- doFinishConsumer(now, cause)
- }
- }
-
- /**
- * Fail the [FlowSource].
- */
- private fun doFailSource(now: Long, cause: Throwable) {
- try {
- source.onStop(this, now)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- doFinishConsumer(now, e)
- }
- }
-
- /**
- * Finish the consumer.
- */
- private fun doFinishConsumer(now: Long, cause: Throwable?) {
- try {
- logic.onFinish(this, now, cause)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- logger.error(e) { "Uncaught exception" }
- }
- }
-
- /**
- * Schedule an immediate update for this connection.
- */
- private fun scheduleImmediate(now: Long, flags: Int) {
- // In case an immediate update is already scheduled, no need to do anything
- if (flags and ConnUpdatePending != 0) {
- _flags = flags
- return
- }
-
- // Mark the connection that there is an update pending
- _flags = flags or ConnUpdatePending
-
- engine.scheduleImmediate(now, this)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
deleted file mode 100644
index 403a9aec..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import java.util.ArrayDeque
-
-/**
- * A specialized [ArrayDeque] that tracks the [FlowConsumerContextImpl] instances that have updated in an interpreter
- * cycle.
- *
- * By using a specialized class, we reduce the overhead caused by type-erasure.
- */
-internal class FlowDeque(initialCapacity: Int = 256) {
- /**
- * The array of elements in the queue.
- */
- private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity)
- private var _head = 0
- private var _tail = 0
-
- /**
- * Determine whether this queue is not empty.
- */
- fun isNotEmpty(): Boolean {
- return _head != _tail
- }
-
- /**
- * Add the specified [ctx] to the queue.
- */
- fun add(ctx: FlowConsumerContextImpl) {
- val es = _elements
- var tail = _tail
-
- es[tail] = ctx
-
- tail = inc(tail, es.size)
- _tail = tail
-
- if (_head == tail) {
- doubleCapacity()
- }
- }
-
- /**
- * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty.
- */
- fun poll(): FlowConsumerContextImpl? {
- val es = _elements
- val head = _head
- val ctx = es[head]
-
- if (ctx != null) {
- es[head] = null
- _head = inc(head, es.size)
- }
-
- return ctx
- }
-
- /**
- * Clear the queue.
- */
- fun clear() {
- _elements.fill(null)
- _head = 0
- _tail = 0
- }
-
- private fun inc(i: Int, modulus: Int): Int {
- var x = i
- if (++x >= modulus) {
- x = 0
- }
- return x
- }
-
- /**
- * Doubles the capacity of this deque
- */
- private fun doubleCapacity() {
- assert(_head == _tail)
- val p = _head
- val n = _elements.size
- val r = n - p // number of elements to the right of p
-
- val newCapacity = n shl 1
- check(newCapacity >= 0) { "Sorry, deque too big" }
-
- val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity)
-
- _elements.copyInto(a, 0, p, n)
- _elements.copyInto(a, r, 0, p)
-
- _elements = a
- _head = 0
- _tail = n
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
deleted file mode 100644
index 6fd1ef31..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Runnable
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSource
-import java.time.Clock
-import java.util.ArrayDeque
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Internal implementation of the [FlowEngine] interface.
- *
- * @param context The coroutine context to use.
- * @param clock The virtual simulation clock.
- */
-internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable {
- /**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
- */
- @OptIn(InternalCoroutinesApi::class)
- private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
-
- /**
- * The queue of connection updates that are scheduled for immediate execution.
- */
- private val queue = FlowDeque()
-
- /**
- * A priority queue containing the connection updates to be scheduled in the future.
- */
- private val futureQueue = FlowTimerQueue()
-
- /**
- * The stack of engine invocations to occur in the future.
- */
- private val futureInvocations = ArrayDeque<Invocation>()
-
- /**
- * The systems that have been visited during the engine cycle.
- */
- private val visited = FlowDeque()
-
- /**
- * The index in the batch stack.
- */
- private var batchIndex = 0
-
- /**
- * The virtual [Clock] of this engine.
- */
- override val clock: Clock
- get() = _clock
- private val _clock: Clock = clock
-
- /**
- * Update the specified [ctx] synchronously.
- */
- fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
-
- // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
- // up by the active engine.
- if (batchIndex > 0) {
- return
- }
-
- doRunEngine(now)
- }
-
- /**
- * Enqueue the specified [ctx] to be updated immediately during the active engine cycle.
- *
- * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
- * re-computed. In case no engine is currently active, the engine will be started.
- */
- fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) {
- queue.add(ctx)
-
- // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
- // up by the active engine.
- if (batchIndex > 0) {
- return
- }
-
- doRunEngine(now)
- }
-
- override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
-
- override fun pushBatch() {
- batchIndex++
- }
-
- override fun popBatch() {
- try {
- // Flush the work if the engine is not already running
- if (batchIndex == 1 && queue.isNotEmpty()) {
- doRunEngine(_clock.millis())
- }
- } finally {
- batchIndex--
- }
- }
-
- /* Runnable */
- override fun run() {
- val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
- doRunEngine(invocation.timestamp)
- }
-
- /**
- * Run all the enqueued actions for the specified [timestamp][now].
- */
- private fun doRunEngine(now: Long) {
- val queue = queue
- val futureQueue = futureQueue
- val futureInvocations = futureInvocations
- val visited = visited
-
- try {
- // Increment batch index so synchronous calls will not launch concurrent engine invocations
- batchIndex++
-
- // Execute all scheduled updates at current timestamp
- while (true) {
- val ctx = futureQueue.poll(now) ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
- }
-
- // Repeat execution of all immediate updates until the system has converged to a steady-state
- // We have to take into account that the onConverge callback can also trigger new actions.
- do {
- // Execute all immediate updates
- while (true) {
- val ctx = queue.poll() ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
- }
-
- while (true) {
- val ctx = visited.poll() ?: break
- ctx.onConverge(now)
- }
- } while (queue.isNotEmpty())
- } finally {
- // Decrement batch index to indicate no engine is active at the moment
- batchIndex--
- }
-
- // Schedule an engine invocation for the next update to occur.
- val headDeadline = futureQueue.peekDeadline()
- if (headDeadline != Long.MAX_VALUE) {
- trySchedule(now, futureInvocations, headDeadline)
- }
- }
-
- /**
- * Try to schedule an engine invocation at the specified [target].
- *
- * @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the engine invocation should happen.
- * @param scheduled The queue of scheduled invocations.
- */
- private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- val head = scheduled.peek()
-
- // Only schedule a new scheduler invocation in case the target is earlier than all other pending
- // scheduler invocations
- if (head == null || target < head.timestamp) {
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, this, context)
- scheduled.addFirst(Invocation(target, handle))
- }
- }
-
- /**
- * A future engine invocation.
- *
- * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
- * the invocation is not needed anymore, it can be cancelled via [cancel].
- */
- private class Invocation(
- @JvmField val timestamp: Long,
- @JvmField val handle: DisposableHandle
- ) {
- /**
- * Cancel the engine invocation.
- */
- fun cancel() = handle.dispose()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
deleted file mode 100644
index 47061a91..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * Specialized priority queue for flow timers.
- *
- * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
- * being generic.
- */
-internal class FlowTimerQueue(initialCapacity: Int = 256) {
- /**
- * The binary heap of deadlines.
- */
- private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE }
-
- /**
- * The binary heap of [FlowConsumerContextImpl]s.
- */
- private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity)
-
- /**
- * The number of elements in the priority queue.
- */
- private var size = 0
-
- /**
- * Register a timer for [ctx] with [deadline].
- */
- fun add(ctx: FlowConsumerContextImpl, deadline: Long) {
- val i = size
- var deadlines = _deadlines
- if (i >= deadlines.size) {
- grow()
- // Re-fetch the resized array
- deadlines = _deadlines
- }
-
- siftUp(deadlines, _pending, i, ctx, deadline)
-
- size = i + 1
- }
-
- /**
- * Update all pending [FlowConsumerContextImpl]s at the timestamp [now].
- */
- fun poll(now: Long): FlowConsumerContextImpl? {
- if (size == 0) {
- return null
- }
-
- val deadlines = _deadlines
- val deadline = deadlines[0]
-
- if (now < deadline) {
- return null
- }
-
- val pending = _pending
- val res = pending[0]
- val s = --size
-
- val nextDeadline = deadlines[s]
- val next = pending[s]!!
-
- // Clear the last element of the queue
- pending[s] = null
- deadlines[s] = Long.MIN_VALUE
-
- if (s != 0) {
- siftDown(deadlines, pending, next, nextDeadline)
- }
-
- return res
- }
-
- /**
- * Find the earliest deadline in the queue.
- */
- fun peekDeadline(): Long {
- return if (size == 0) Long.MAX_VALUE else _deadlines[0]
- }
-
- /**
- * Increases the capacity of the array.
- */
- private fun grow() {
- val oldCapacity = _deadlines.size
- // Double size if small; else grow by 50%
- val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1
-
- _deadlines = _deadlines.copyOf(newCapacity)
- _pending = _pending.copyOf(newCapacity)
- }
-
- /**
- * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is
- * greater than or equal to its parent, or is the root.
- *
- * @param deadlines The heap of deadlines.
- * @param pending The heap of contexts.
- * @param pos The position to fill.
- * @param ctx The [FlowConsumerContextImpl] to insert.
- * @param deadline The deadline of the context.
- */
- private fun siftUp(
- deadlines: LongArray,
- pending: Array<FlowConsumerContextImpl?>,
- pos: Int,
- ctx: FlowConsumerContextImpl,
- deadline: Long
- ) {
- var k = pos
-
- while (k > 0) {
- val parent = (k - 1) ushr 1
- val parentDeadline = deadlines[parent]
-
- if (deadline >= parentDeadline) {
- break
- }
-
- deadlines[k] = parentDeadline
- pending[k] = pending[parent]
-
- k = parent
- }
-
- deadlines[k] = deadline
- pending[k] = ctx
- }
-
- /**
- * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it
- * is less than or equal to its children or is a leaf.
- *
- * @param deadlines The heap of deadlines.
- * @param pending The heap of contexts.
- * @param ctx The [FlowConsumerContextImpl] to insert.
- * @param deadline The deadline of the context.
- */
- private fun siftDown(
- deadlines: LongArray,
- pending: Array<FlowConsumerContextImpl?>,
- ctx: FlowConsumerContextImpl,
- deadline: Long
- ) {
- var k = 0
- val size = size
- val half = size ushr 1
-
- while (k < half) {
- var child = (k shl 1) + 1
-
- var childDeadline = deadlines[child]
- val right = child + 1
-
- if (right < size) {
- val rightDeadline = deadlines[right]
-
- if (childDeadline > rightDeadline) {
- child = right
- childDeadline = rightDeadline
- }
- }
-
- if (deadline <= childDeadline) {
- break
- }
-
- deadlines[k] = childDeadline
- pending[k] = pending[child]
-
- k = child
- }
-
- deadlines[k] = deadline
- pending[k] = ctx
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
deleted file mode 100644
index c320a362..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import org.opendc.simulator.flow.FlowCounters
-
-/**
- * Mutable implementation of the [FlowCounters] interface.
- */
-public class MutableFlowCounters : FlowCounters {
- override val demand: Double
- get() = _counters[0]
- override val actual: Double
- get() = _counters[1]
- override val remaining: Double
- get() = _counters[2]
- private val _counters = DoubleArray(3)
-
- override fun reset() {
- _counters.fill(0.0)
- }
-
- public fun increment(demand: Double, actual: Double, remaining: Double) {
- val counters = _counters
- counters[0] += demand
- counters[1] += actual
- counters[2] += remaining
- }
-
- override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
deleted file mode 100644
index 8752c559..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
- */
-public interface FlowMultiplexer {
- /**
- * The maximum number of inputs supported by the multiplexer.
- */
- public val maxInputs: Int
-
- /**
- * The maximum number of outputs supported by the multiplexer.
- */
- public val maxOutputs: Int
-
- /**
- * The inputs of the multiplexer that can be used to consume sources.
- */
- public val inputs: Set<FlowConsumer>
-
- /**
- * The outputs of the multiplexer over which the flows will be distributed.
- */
- public val outputs: Set<FlowSource>
-
- /**
- * The actual processing rate of the multiplexer.
- */
- public val rate: Double
-
- /**
- * The demanded processing rate of the input.
- */
- public val demand: Double
-
- /**
- * The capacity of the outputs.
- */
- public val capacity: Double
-
- /**
- * The flow counters to track the flow metrics of all multiplexer inputs.
- */
- public val counters: FlowCounters
-
- /**
- * Create a new input on this multiplexer with a coupled capacity.
- */
- public fun newInput(): FlowConsumer
-
- /**
- * Create a new input on this multiplexer with the specified [capacity].
- *
- * @param capacity The capacity of the input.
- */
- public fun newInput(capacity: Double): FlowConsumer
-
- /**
- * Remove [input] from this multiplexer.
- */
- public fun removeInput(input: FlowConsumer)
-
- /**
- * Create a new output on this multiplexer.
- */
- public fun newOutput(): FlowSource
-
- /**
- * Remove [output] from this multiplexer.
- */
- public fun removeOutput(output: FlowSource)
-
- /**
- * Clear all inputs and outputs from the multiplexer.
- */
- public fun clear()
-
- /**
- * Clear the inputs of the multiplexer.
- */
- public fun clearInputs()
-
- /**
- * Clear the outputs of the multiplexer.
- */
- public fun clearOutputs()
-
- /**
- * Flush the counters of the multiplexer.
- */
- public fun flushCounters()
-
- /**
- * Flush the counters of the specified [input].
- */
- public fun flushCounters(input: FlowConsumer)
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
deleted file mode 100644
index a863e3ad..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-
-/**
- * Factory interface for a [FlowMultiplexer] implementation.
- */
-public fun interface FlowMultiplexerFactory {
- /**
- * Construct a new [FlowMultiplexer] using the specified [engine] and [listener].
- */
- public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer
-
- public companion object {
- /**
- * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer].
- */
- private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) }
-
- /**
- * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer].
- */
- private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) }
-
- /**
- * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances.
- */
- @JvmStatic
- public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY
-
- /**
- * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances.
- */
- @JvmStatic
- public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
deleted file mode 100644
index 53f94a94..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowForwarder
-import org.opendc.simulator.flow.FlowSource
-import java.util.ArrayDeque
-
-/**
- * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
- * that a single input is directly connected to an output and that the multiplexer can only support as many
- * inputs as outputs.
- *
- * @param engine The [FlowEngine] driving the simulation.
- * @param listener The convergence listener of the multiplexer.
- */
-public class ForwardingFlowMultiplexer(
- private val engine: FlowEngine,
- private val listener: FlowConvergenceListener? = null
-) : FlowMultiplexer, FlowConvergenceListener {
-
- override val maxInputs: Int
- get() = _outputs.size
-
- override val maxOutputs: Int = Int.MAX_VALUE
-
- override val inputs: Set<FlowConsumer>
- get() = _inputs
- private val _inputs = mutableSetOf<Input>()
-
- override val outputs: Set<FlowSource>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
- private val _availableOutputs = ArrayDeque<Output>()
-
- override val counters: FlowCounters = object : FlowCounters {
- override val demand: Double
- get() = _outputs.sumOf { it.forwarder.counters.demand }
- override val actual: Double
- get() = _outputs.sumOf { it.forwarder.counters.actual }
- override val remaining: Double
- get() = _outputs.sumOf { it.forwarder.counters.remaining }
-
- override fun reset() {
- for (output in _outputs) {
- output.forwarder.counters.reset()
- }
- }
-
- override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
- }
-
- override val rate: Double
- get() = _outputs.sumOf { it.forwarder.rate }
-
- override val demand: Double
- get() = _outputs.sumOf { it.forwarder.demand }
-
- override val capacity: Double
- get() = _outputs.sumOf { it.forwarder.capacity }
-
- override fun newInput(): FlowConsumer {
- val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
- val input = Input(output)
- _inputs += input
- return input
- }
-
- override fun newInput(capacity: Double): FlowConsumer = newInput()
-
- override fun removeInput(input: FlowConsumer) {
- if (!_inputs.remove(input)) {
- return
- }
-
- val output = (input as Input).output
- output.forwarder.cancel()
- _availableOutputs += output
- }
-
- override fun newOutput(): FlowSource {
- val forwarder = FlowForwarder(engine, this)
- val output = Output(forwarder)
-
- _outputs += output
- return output
- }
-
- override fun removeOutput(output: FlowSource) {
- if (!_outputs.remove(output)) {
- return
- }
-
- val forwarder = (output as Output).forwarder
- forwarder.close()
- }
-
- override fun clearInputs() {
- for (input in _inputs) {
- val output = input.output
- output.forwarder.cancel()
- _availableOutputs += output
- }
-
- _inputs.clear()
- }
-
- override fun clearOutputs() {
- for (output in _outputs) {
- output.forwarder.cancel()
- }
- _outputs.clear()
- _availableOutputs.clear()
- }
-
- override fun clear() {
- clearOutputs()
- clearInputs()
- }
-
- override fun flushCounters() {}
-
- override fun flushCounters(input: FlowConsumer) {}
-
- override fun onConverge(now: Long) {
- listener?.onConverge(now)
- }
-
- /**
- * An input on the multiplexer.
- */
- private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
- override fun toString(): String = "ForwardingFlowMultiplexer.Input"
- }
-
- /**
- * An output on the multiplexer.
- */
- private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
- override fun onStart(conn: FlowConnection, now: Long) {
- _availableOutputs += this
- forwarder.onStart(conn, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- forwarder.cancel()
- forwarder.onStop(conn, now)
- }
-
- override fun toString(): String = "ForwardingFlowMultiplexer.Output"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
deleted file mode 100644
index d9c6f893..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ /dev/null
@@ -1,811 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-import kotlin.math.min
-
-/**
- * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing.
- *
- * @param engine The [FlowEngine] to drive the flow simulation.
- * @param parent The parent flow system of the multiplexer.
- */
-public class MaxMinFlowMultiplexer(
- private val engine: FlowEngine,
- parent: FlowConvergenceListener? = null
-) : FlowMultiplexer {
-
- override val maxInputs: Int = Int.MAX_VALUE
-
- override val maxOutputs: Int = Int.MAX_VALUE
-
- /**
- * The inputs of the multiplexer.
- */
- override val inputs: Set<FlowConsumer>
- get() = _inputs
- private val _inputs = mutableSetOf<Input>()
-
- /**
- * The outputs of the multiplexer.
- */
- override val outputs: Set<FlowSource>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
-
- /**
- * The flow counters of this multiplexer.
- */
- public override val counters: FlowCounters
- get() = scheduler.counters
-
- /**
- * The actual processing rate of the multiplexer.
- */
- public override val rate: Double
- get() = scheduler.rate
-
- /**
- * The demanded processing rate of the input.
- */
- public override val demand: Double
- get() = scheduler.demand
-
- /**
- * The capacity of the outputs.
- */
- public override val capacity: Double
- get() = scheduler.capacity
-
- /**
- * The [Scheduler] instance of this multiplexer.
- */
- private val scheduler = Scheduler(engine, parent)
-
- override fun newInput(): FlowConsumer {
- return newInput(isCoupled = true, Double.POSITIVE_INFINITY)
- }
-
- override fun newInput(capacity: Double): FlowConsumer {
- return newInput(isCoupled = false, capacity)
- }
-
- private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer {
- val provider = Input(engine, scheduler, isCoupled, initialCapacity)
- _inputs.add(provider)
- return provider
- }
-
- override fun removeInput(input: FlowConsumer) {
- if (!_inputs.remove(input)) {
- return
- }
- // This cast should always succeed since only `Input` instances should be added to `_inputs`
- (input as Input).close()
- }
-
- override fun newOutput(): FlowSource {
- val output = Output(scheduler)
- _outputs.add(output)
- return output
- }
-
- override fun removeOutput(output: FlowSource) {
- if (!_outputs.remove(output)) {
- return
- }
-
- // This cast should always succeed since only `Output` instances should be added to `_outputs`
- (output as Output).cancel()
- }
-
- override fun clearInputs() {
- for (input in _inputs) {
- input.cancel()
- }
- _inputs.clear()
- }
-
- override fun clearOutputs() {
- for (output in _outputs) {
- output.cancel()
- }
- _outputs.clear()
- }
-
- override fun clear() {
- clearOutputs()
- clearInputs()
- }
-
- override fun flushCounters() {
- scheduler.updateCounters(engine.clock.millis())
- }
-
- override fun flushCounters(input: FlowConsumer) {
- (input as Input).doUpdateCounters(engine.clock.millis())
- }
-
- /**
- * Helper class containing the scheduler state.
- */
- private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) {
- /**
- * The flow counters of this scheduler.
- */
- @JvmField val counters = MutableFlowCounters()
-
- /**
- * The flow rate of the multiplexer.
- */
- @JvmField var rate = 0.0
-
- /**
- * The demand for the multiplexer.
- */
- @JvmField var demand = 0.0
-
- /**
- * The capacity of the multiplexer.
- */
- @JvmField var capacity = 0.0
-
- /**
- * An [Output] that is used to activate the scheduler.
- */
- @JvmField var activationOutput: Output? = null
-
- /**
- * The active inputs registered with the scheduler.
- */
- private val _activeInputs = mutableListOf<Input>()
-
- /**
- * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList].
- */
- private var _inputArray = emptyArray<Input>()
-
- /**
- * The active outputs registered with the scheduler.
- */
- private val _activeOutputs = mutableListOf<Output>()
-
- /**
- * Flag to indicate that the scheduler is active.
- */
- private var _schedulerActive = false
-
- /**
- * The last convergence timestamp and the input.
- */
- private var _lastConverge: Long = Long.MIN_VALUE
- private var _lastConvergeInput: Input? = null
-
- /**
- * The simulation clock.
- */
- private val _clock = engine.clock
-
- /**
- * Register the specified [input] to this scheduler.
- */
- fun registerInput(input: Input) {
- _activeInputs.add(input)
- _inputArray = _activeInputs.toTypedArray()
-
- val hasActivationOutput = activationOutput != null
-
- // Disable timers and convergence of the source if one of the output manages it
- input.shouldConsumerConverge = !hasActivationOutput
- input.enableTimers = !hasActivationOutput
-
- if (input.isCoupled) {
- input.capacity = capacity
- }
-
- trigger(_clock.millis())
- }
-
- /**
- * De-register the specified [input] from this scheduler.
- */
- fun deregisterInput(input: Input, now: Long) {
- // Assign a new input responsible for handling the convergence events
- if (_lastConvergeInput == input) {
- _lastConvergeInput = null
- }
-
- _activeInputs.remove(input)
-
- // Re-run scheduler to distribute new load
- trigger(now)
- }
-
- /**
- * This method is invoked when one of the inputs converges.
- */
- fun convergeInput(input: Input, now: Long) {
- val lastConverge = _lastConverge
- val lastConvergeInput = _lastConvergeInput
- val parent = parent
-
- if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) {
- _lastConverge = now
- _lastConvergeInput = input
-
- parent.onConverge(now)
- }
- }
-
- /**
- * Register the specified [output] to this scheduler.
- */
- fun registerOutput(output: Output) {
- _activeOutputs.add(output)
-
- updateCapacity()
- updateActivationOutput()
- }
-
- /**
- * De-register the specified [output] from this scheduler.
- */
- fun deregisterOutput(output: Output, now: Long) {
- _activeOutputs.remove(output)
- updateCapacity()
-
- trigger(now)
- }
-
- /**
- * This method is invoked when one of the outputs converges.
- */
- fun convergeOutput(output: Output, now: Long) {
- val parent = parent
-
- if (parent != null) {
- _lastConverge = now
- parent.onConverge(now)
- }
-
- if (!output.isActive) {
- output.isActivationOutput = false
- updateActivationOutput()
- }
- }
-
- /**
- * Trigger the scheduler of the multiplexer.
- *
- * @param now The current virtual timestamp of the simulation.
- */
- fun trigger(now: Long) {
- if (_schedulerActive) {
- // No need to trigger the scheduler in case it is already active
- return
- }
-
- val activationOutput = activationOutput
-
- // We can run the scheduler in two ways:
- // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
- // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
- // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
- // a few inputs and little changes at the same timestamp.
- // We always pick for option (1) unless there are no outputs available.
- if (activationOutput != null) {
- activationOutput.pull(now)
- return
- } else {
- runScheduler(now)
- }
- }
-
- /**
- * Synchronously run the scheduler of the multiplexer.
- */
- fun runScheduler(now: Long): Long {
- return try {
- _schedulerActive = true
- doRunScheduler(now)
- } finally {
- _schedulerActive = false
- }
- }
-
- /**
- * Recompute the capacity of the multiplexer.
- */
- fun updateCapacity() {
- val newCapacity = _activeOutputs.sumOf(Output::capacity)
-
- // No-op if the capacity is unchanged
- if (capacity == newCapacity) {
- return
- }
-
- capacity = newCapacity
-
- for (input in _activeInputs) {
- if (input.isCoupled) {
- input.capacity = newCapacity
- }
- }
-
- // Sort outputs by their capacity
- _activeOutputs.sort()
- }
-
- /**
- * Updates the output that is used for scheduler activation.
- */
- private fun updateActivationOutput() {
- val output = _activeOutputs.firstOrNull()
- activationOutput = output
-
- if (output != null) {
- output.isActivationOutput = true
- }
-
- val hasActivationOutput = output != null
-
- for (input in _activeInputs) {
- input.shouldConsumerConverge = !hasActivationOutput
- input.enableTimers = !hasActivationOutput
- }
- }
-
- /**
- * Schedule the inputs over the outputs.
- *
- * @return The deadline after which a new scheduling cycle should start.
- */
- private fun doRunScheduler(now: Long): Long {
- val activeInputs = _activeInputs
- val activeOutputs = _activeOutputs
- var inputArray = _inputArray
- var inputSize = _inputArray.size
-
- // Update the counters of the scheduler
- updateCounters(now)
-
- // If there is no work yet, mark the inputs as idle.
- if (inputSize == 0) {
- demand = 0.0
- rate = 0.0
- return Long.MAX_VALUE
- }
-
- val capacity = capacity
- var availableCapacity = capacity
- var deadline = Long.MAX_VALUE
- var demand = 0.0
- var shouldRebuild = false
-
- // Pull in the work of the inputs
- for (i in 0 until inputSize) {
- val input = inputArray[i]
-
- input.pullSync(now)
-
- // Remove inputs that have finished
- if (!input.isActive) {
- input.actualRate = 0.0
- shouldRebuild = true
- } else {
- demand += input.limit
- deadline = min(deadline, input.deadline)
- }
- }
-
- // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs`
- if (shouldRebuild) {
- inputArray = activeInputs.toTypedArray()
- inputSize = inputArray.size
- _inputArray = inputArray
- }
-
- val rate = if (demand > capacity) {
- // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
- // constrained capacity across the inputs.
-
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- inputArray.sort()
-
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- for (i in 0 until inputSize) {
- val input = inputArray[i]
- val availableShare = availableCapacity / (inputSize - i)
- val grantedRate = min(input.allowedRate, availableShare)
-
- availableCapacity -= grantedRate
- input.actualRate = grantedRate
- }
-
- capacity - availableCapacity
- } else {
- demand
- }
-
- this.demand = demand
- if (this.rate != rate) {
- // Only update the outputs if the output rate has changed
- this.rate = rate
-
- // Divide the requests over the available capacity of the input resources fairly
- for (i in activeOutputs.indices) {
- val output = activeOutputs[i]
- val inputCapacity = output.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
-
- output.push(grantedSpeed)
- }
- }
-
- return deadline
- }
-
- /**
- * The previous capacity of the multiplexer.
- */
- private var _previousCapacity = 0.0
- private var _previousUpdate = Long.MIN_VALUE
-
- /**
- * Update the counters of the scheduler.
- */
- fun updateCounters(now: Long) {
- val previousCapacity = _previousCapacity
- _previousCapacity = capacity
-
- val previousUpdate = _previousUpdate
- _previousUpdate = now
-
- val delta = now - previousUpdate
- if (delta <= 0) {
- return
- }
-
- val deltaS = delta * D_MS_TO_S
- val demand = demand
- val rate = rate
-
- counters.increment(
- demand = demand * deltaS,
- actual = rate * deltaS,
- remaining = (previousCapacity - rate) * deltaS
- )
- }
- }
-
- /**
- * An internal [FlowConsumer] implementation for multiplexer inputs.
- */
- private class Input(
- private val engine: FlowEngine,
- private val scheduler: Scheduler,
- @JvmField val isCoupled: Boolean,
- initialCapacity: Double
- ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
- /**
- * A flag to indicate that the consumer is active.
- */
- override val isActive: Boolean
- get() = _ctx != null
-
- /**
- * The demand of the consumer.
- */
- override val demand: Double
- get() = limit
-
- /**
- * The processing rate of the consumer.
- */
- override val rate: Double
- get() = actualRate
-
- /**
- * The capacity of the input.
- */
- override var capacity: Double
- get() = _capacity
- set(value) {
- allowedRate = min(limit, value)
- _capacity = value
- _ctx?.capacity = value
- }
- private var _capacity = initialCapacity
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- /**
- * A flag to enable timers for the input.
- */
- var enableTimers: Boolean = true
- set(value) {
- field = value
- _ctx?.enableTimers = value
- }
-
- /**
- * A flag to control whether the input should converge.
- */
- var shouldConsumerConverge: Boolean = true
- set(value) {
- field = value
- _ctx?.shouldConsumerConverge = value
- }
-
- /**
- * The requested limit.
- */
- @JvmField var limit: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- @JvmField var actualRate: Double = 0.0
-
- /**
- * The processing rate that is allowed by the model constraints.
- */
- @JvmField var allowedRate: Double = 0.0
-
- /**
- * The deadline of the input.
- */
- val deadline: Long
- get() = _ctx?.deadline ?: Long.MAX_VALUE
-
- /**
- * The [FlowConsumerContext] that is currently running.
- */
- private var _ctx: FlowConsumerContext? = null
-
- /**
- * A flag to indicate that the input is closed.
- */
- private var _isClosed: Boolean = false
-
- /**
- * Close the input.
- *
- * This method is invoked when the user removes an input from the switch.
- */
- fun close() {
- _isClosed = true
- cancel()
- }
-
- /**
- * Pull the source if necessary.
- */
- fun pullSync(now: Long) {
- _ctx?.pullSync(now)
- }
-
- /* FlowConsumer */
- override fun startConsumer(source: FlowSource) {
- check(!_isClosed) { "Cannot re-use closed input" }
- check(_ctx == null) { "Consumer is in invalid state" }
-
- val ctx = engine.newContext(source, this)
- _ctx = ctx
-
- ctx.capacity = capacity
- scheduler.registerInput(this)
-
- ctx.start()
- }
-
- override fun pull() {
- _ctx?.pull()
- }
-
- override fun cancel() {
- _ctx?.close()
- }
-
- /* FlowConsumerLogic */
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- rate: Double
- ) {
- doUpdateCounters(now)
-
- val allowed = min(rate, capacity)
- limit = rate
- actualRate = allowed
- allowedRate = allowed
-
- scheduler.trigger(now)
- }
-
- override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
- doUpdateCounters(now)
-
- limit = 0.0
- actualRate = 0.0
- allowedRate = 0.0
-
- scheduler.deregisterInput(this, now)
-
- _ctx = null
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long) {
- scheduler.convergeInput(this, now)
- }
-
- /* Comparable */
- override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
-
- /**
- * The timestamp that the counters where last updated.
- */
- private var _lastUpdate = Long.MIN_VALUE
-
- /**
- * Helper method to update the flow counters of the multiplexer.
- */
- fun doUpdateCounters(now: Long) {
- val lastUpdate = _lastUpdate
- _lastUpdate = now
-
- val delta = (now - lastUpdate).coerceAtLeast(0)
- if (delta <= 0L) {
- return
- }
-
- val actualRate = actualRate
-
- val deltaS = delta * D_MS_TO_S
- val demand = limit * deltaS
- val actual = actualRate * deltaS
- val remaining = (_capacity - actualRate) * deltaS
-
- _counters.increment(demand, actual, remaining)
- scheduler.counters.increment(0.0, 0.0, 0.0)
- }
- }
-
- /**
- * An internal [FlowSource] implementation for multiplexer outputs.
- */
- private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> {
- /**
- * The active [FlowConnection] of this source.
- */
- private var _conn: FlowConnection? = null
-
- /**
- * The capacity of this output.
- */
- @JvmField var capacity: Double = 0.0
-
- /**
- * A flag to indicate that this output is the activation output.
- */
- var isActivationOutput: Boolean
- get() = _isActivationOutput
- set(value) {
- _isActivationOutput = value
- _conn?.shouldSourceConverge = value
- }
- private var _isActivationOutput: Boolean = false
-
- /**
- * A flag to indicate that the output is active.
- */
- @JvmField var isActive = false
-
- /**
- * Push the specified rate to the consumer.
- */
- fun push(rate: Double) {
- _conn?.push(rate)
- }
-
- /**
- * Cancel this output.
- */
- fun cancel() {
- _conn?.close()
- }
-
- /**
- * Pull this output.
- */
- fun pull(now: Long) {
- _conn?.pull(now)
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- assert(_conn == null) { "Source running concurrently" }
- _conn = conn
- capacity = conn.capacity
- isActive = true
-
- scheduler.registerOutput(this)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _conn = null
- capacity = 0.0
- isActive = false
-
- scheduler.deregisterOutput(this, now)
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val capacity = capacity
- if (capacity != conn.capacity) {
- this.capacity = capacity
- scheduler.updateCapacity()
- }
-
- return if (_isActivationOutput) {
- // If this output is the activation output, synchronously run the scheduler and return the new deadline
- val deadline = scheduler.runScheduler(now)
- if (deadline == Long.MAX_VALUE) {
- deadline
- } else {
- deadline - now
- }
- } else {
- // Output is not the activation output, so trigger activation output and do not install timer for this
- // output (by returning `Long.MAX_VALUE`)
- scheduler.trigger(now)
-
- Long.MAX_VALUE
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- if (_isActivationOutput) {
- scheduler.convergeOutput(this, now)
- }
- }
-
- override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
deleted file mode 100644
index 6cfcc82c..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-import kotlin.math.roundToLong
-
-/**
- * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization].
- */
-public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource {
-
- init {
- require(amount >= 0.0) { "Amount must be positive" }
- require(utilization > 0.0) { "Utilization must be positive" }
- }
-
- private var remainingAmount = amount
- private var lastPull: Long = 0L
-
- override fun onStart(conn: FlowConnection, now: Long) {
- lastPull = now
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val lastPull = lastPull
- this.lastPull = now
- val delta = (now - lastPull).coerceAtLeast(0)
-
- val consumed = conn.rate * delta / 1000.0
- val limit = conn.capacity * utilization
-
- remainingAmount -= consumed
-
- val duration = (remainingAmount / limit * 1000).roundToLong()
-
- return if (duration > 0) {
- conn.push(limit)
- duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
deleted file mode 100644
index b3191ad3..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-/**
- * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to
- * finish a pull, before proceeding its operation.
- */
-public class FlowSourceBarrier(public val parties: Int) {
- private var counter = 0
-
- /**
- * Enter the barrier and determine whether the caller is the last to reach the barrier.
- *
- * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
- */
- public fun enter(): Boolean {
- val last = ++counter == parties
- if (last) {
- counter = 0
- return true
- }
- return false
- }
-
- /**
- * Reset the barrier.
- */
- public fun reset() {
- counter = 0
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
deleted file mode 100644
index 80127fb5..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * Helper class to expose an observable [rate] field describing the flow rate of the source.
- */
-public class FlowSourceRateAdapter(
- private val delegate: FlowSource,
- private val callback: (Double) -> Unit = {}
-) : FlowSource by delegate {
- /**
- * The resource processing speed at this instant.
- */
- public var rate: Double = 0.0
- private set(value) {
- if (field != value) {
- callback(value)
- field = value
- }
- }
-
- init {
- callback(0.0)
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.shouldSourceConverge = true
-
- delegate.onStart(conn, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- try {
- delegate.onStop(conn, now)
- } finally {
- rate = 0.0
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return delegate.onPull(conn, now)
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- delegate.onConverge(conn, now)
- } finally {
- rate = conn.rate
- }
- }
-
- override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
deleted file mode 100644
index c9a52128..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time.
- */
-public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource {
- private var _iterator: Iterator<Fragment>? = null
- private var _nextTarget = Long.MIN_VALUE
-
- override fun onStart(conn: FlowConnection, now: Long) {
- check(_iterator == null) { "Source already running" }
- _iterator = trace.iterator()
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _iterator = null
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- // Check whether the trace fragment was fully consumed, otherwise wait until we have done so
- val nextTarget = _nextTarget
- if (nextTarget > now) {
- return now - nextTarget
- }
-
- val iterator = checkNotNull(_iterator)
- return if (iterator.hasNext()) {
- val fragment = iterator.next()
- _nextTarget = now + fragment.duration
- conn.push(fragment.usage)
- fragment.duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
-
- /**
- * A fragment of the trace.
- */
- public data class Fragment(val duration: Long, val usage: Double)
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
deleted file mode 100644
index f89133dd..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowConsumerContextImpl] class.
- */
-class FlowConsumerContextTest {
- @Test
- fun testFlushWithoutCommand() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- engine.scheduleSync(engine.clock.millis(), context)
- }
-
- @Test
- fun testDoubleStart() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(0.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- context.start()
-
- assertThrows<IllegalStateException> {
- context.start()
- }
- }
-
- @Test
- fun testIdempotentCapacityChange() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
- context.capacity = 4200.0
- context.start()
- context.capacity = 4200.0
-
- verify(exactly = 1) { consumer.onPull(any(), any()) }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
deleted file mode 100644
index f75e5037..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Disabled
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowForwarder] class.
- */
-internal class FlowForwarderTest {
- @Test
- fun testCancelImmediately() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testCancel() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 10 * 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testState() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- assertFalse(forwarder.isActive)
-
- forwarder.startConsumer(consumer)
- assertTrue(forwarder.isActive)
-
- assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
-
- forwarder.cancel()
- assertFalse(forwarder.isActive)
-
- forwarder.close()
- assertFalse(forwarder.isActive)
- }
-
- @Test
- fun testCancelPendingDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
-
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.startConsumer(consumer)
- forwarder.cancel()
-
- verify(exactly = 0) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelStartedDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- forwarder.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- source.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testExitPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- source.startConsumer(forwarder)
- forwarder.consume(consumer)
- yield()
-
- assertFalse(forwarder.isActive)
- }
-
- @Test
- @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val sink = FlowSink(engine, 1.0)
-
- val source = spyk(FixedFlowSource(2.0, 1.0))
- sink.startConsumer(forwarder)
-
- coroutineScope {
- launch { forwarder.consume(source) }
- delay(1000)
- sink.capacity = 0.5
- }
-
- assertEquals(3000, clock.millis())
- verify(exactly = 1) { source.onPull(any(), any()) }
- }
-
- @Test
- fun testCounters() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 1.0)
-
- val consumer = FixedFlowSource(2.0, 1.0)
- source.startConsumer(forwarder)
-
- forwarder.consume(consumer)
-
- yield()
-
- assertAll(
- { assertEquals(2.0, source.counters.actual) },
- { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } },
- { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } },
- { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } },
- { assertEquals(2000, clock.millis()) }
- )
- }
-
- @Test
- fun testCoupledExit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(FixedFlowSource(2000.0, 1.0))
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testPullFailureCoupled() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testStartFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-
- @Test
- fun testConvergeFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.shouldSourceConverge = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
deleted file mode 100644
index 746d752d..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowSink] class.
- */
-internal class FlowSinkTest {
- @Test
- fun testSpeed() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(4200.0, 1.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- @Test
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(engine, 1.0)
-
- val consumer = spyk(FixedFlowSource(2.0, 1.0))
-
- coroutineScope {
- launch { provider.consume(consumer) }
- delay(1000)
- provider.capacity = 0.5
- }
- assertEquals(3000, clock.millis())
- verify(exactly = 3) { consumer.onPull(any(), any()) }
- }
-
- @Test
- fun testSpeedLimit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 2.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- /**
- * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
- * [FlowSource.onPull].
- */
- @Test
- fun testIntermediateInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.pull()
- }
- }
-
- provider.consume(consumer)
- }
-
- @Test
- fun testInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
- lateinit var resCtx: FlowConnection
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- resCtx = conn
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 4000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- launch {
- yield()
- resCtx.pull()
- }
- provider.consume(consumer)
-
- assertEquals(0, clock.millis())
- }
-
- @Test
- fun testFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Hi")
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testExceptionPropagationOnNext() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 1000
- } else {
- throw IllegalStateException()
- }
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testConcurrentConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- assertThrows<IllegalStateException> {
- coroutineScope {
- launch { provider.consume(consumer) }
- provider.consume(consumer)
- }
- }
- }
-
- @Test
- fun testCancelDuringConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- launch { provider.consume(consumer) }
- delay(500)
- provider.cancel()
-
- yield()
-
- assertEquals(500, clock.millis())
- }
-
- @Test
- fun testInfiniteSleep() {
- assertThrows<IllegalStateException> {
- runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
- }
-
- provider.consume(consumer)
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
deleted file mode 100644
index 2409e174..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowForwarder
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [ForwardingFlowMultiplexer] class.
- */
-internal class ForwardingFlowMultiplexerTest {
- /**
- * Test a trace workload.
- */
- @Test
- fun testTrace() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val speed = mutableListOf<Double>()
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
- val forwarder = FlowForwarder(engine)
- val adapter = FlowSourceRateAdapter(forwarder, speed::add)
- source.startConsumer(adapter)
- forwarder.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertAll(
- { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
- { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
- )
- }
-
- /**
- * Test runtime workload on hypervisor.
- */
- @Test
- fun testRuntimeWorkload() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = FixedFlowSource(duration * 3.2, 1.0)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertEquals(duration, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test two workloads running sequentially.
- */
- @Test
- fun testTwoWorkloads() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- isFirst = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
- provider.consume(workload)
- assertEquals(duration * 2, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test concurrent workloads on the machine.
- */
- @Test
- fun testConcurrentWorkloadFails() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- switch.newInput()
- assertThrows<IllegalStateException> { switch.newInput() }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
deleted file mode 100644
index a6bf8ad8..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [FlowMultiplexer] implementations
- */
-internal class MaxMinFlowMultiplexerTest {
- @Test
- fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(scheduler)
-
- val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { it.startConsumer(switch.newOutput()) }
-
- val provider = switch.newInput()
- val consumer = FixedFlowSource(2000.0, 1.0)
-
- try {
- provider.consume(consumer)
- yield()
- } finally {
- switch.clear()
- }
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with a single VM.
- */
- @Test
- fun testOvercommittedSingle() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val provider = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
- provider.consume(workload)
- yield()
- } finally {
- switch.clear()
- }
-
- assertAll(
- { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with two VMs.
- */
- @Test
- fun testOvercommittedDual() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workloadA =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
- val workloadB =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3100.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 73.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val providerA = switch.newInput()
- val providerB = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
-
- coroutineScope {
- launch { providerA.consume(workloadA) }
- providerB.consume(workloadB)
- }
-
- yield()
- } finally {
- switch.clear()
- }
- assertAll(
- { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
deleted file mode 100644
index 552579ff..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FixedFlowSource] class.
- */
-internal class FixedFlowSourceTest {
- @Test
- fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
-
- val consumer = FixedFlowSource(1.0, 1.0)
-
- provider.consume(consumer)
- assertEquals(1000, clock.millis())
- }
-
- @Test
- fun testUtilization() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
-
- val consumer = FixedFlowSource(1.0, 0.5)
-
- provider.consume(consumer)
- assertEquals(2000, clock.millis())
- }
-}