summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt137
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt72
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt62
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt57
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt35
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt48
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt264
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt75
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt160
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt67
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt44
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt436
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt119
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt218
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt200
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt124
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt60
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt177
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt811
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt66
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt52
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt67
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt107
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt331
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt245
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt158
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt150
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt57
33 files changed, 0 insertions, 4783 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
deleted file mode 100644
index 9e0a4a5e..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import kotlinx.coroutines.launch
-import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
-import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-import org.openjdk.jmh.annotations.Benchmark
-import org.openjdk.jmh.annotations.Fork
-import org.openjdk.jmh.annotations.Measurement
-import org.openjdk.jmh.annotations.Scope
-import org.openjdk.jmh.annotations.Setup
-import org.openjdk.jmh.annotations.State
-import org.openjdk.jmh.annotations.Warmup
-import java.util.concurrent.ThreadLocalRandom
-import java.util.concurrent.TimeUnit
-
-@State(Scope.Thread)
-@Fork(1)
-@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
-@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
-class FlowBenchmarks {
- private lateinit var trace: Sequence<TraceFlowSource.Fragment>
-
- @Setup
- fun setUp() {
- val random = ThreadLocalRandom.current()
- val entries = List(1000000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
- trace = entries.asSequence()
- }
-
- @Benchmark
- fun benchmarkSink() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val provider = FlowSink(engine, 4200.0)
- return@runSimulation provider.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkForward() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val provider = FlowSink(engine, 4200.0)
- val forwarder = FlowForwarder(engine)
- provider.startConsumer(forwarder)
- return@runSimulation forwarder.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkMuxMaxMinSingleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- return@runSimulation provider.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkMuxMaxMinTripleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- repeat(3) {
- launch {
- val provider = switch.newInput()
- provider.consume(TraceFlowSource(trace))
- }
- }
- }
- }
-
- @Benchmark
- fun benchmarkMuxExclusiveSingleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = ForwardingFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- return@runSimulation provider.consume(TraceFlowSource(trace))
- }
- }
-
- @Benchmark
- fun benchmarkMuxExclusiveTripleSource() {
- return runSimulation {
- val engine = FlowEngine(coroutineContext, clock)
- val switch = ForwardingFlowMultiplexer(engine)
-
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
- FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
-
- repeat(2) {
- launch {
- val provider = switch.newInput()
- provider.consume(TraceFlowSource(trace))
- }
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
deleted file mode 100644
index 8ff0bc76..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * An active connection between a [FlowSource] and [FlowConsumer].
- */
-public interface FlowConnection : AutoCloseable {
- /**
- * The capacity of the connection.
- */
- public val capacity: Double
-
- /**
- * The flow rate over the connection.
- */
- public val rate: Double
-
- /**
- * The flow demand of the source.
- */
- public val demand: Double
-
- /**
- * A flag to control whether [FlowSource.onConverge] should be invoked for this source.
- */
- public var shouldSourceConverge: Boolean
-
- /**
- * Pull the source.
- */
- public fun pull()
-
- /**
- * Pull the source.
- *
- * @param now The timestamp at which the connection is pulled.
- */
- public fun pull(now: Long)
-
- /**
- * Push the given flow [rate] over this connection.
- *
- * @param rate The rate of the flow to push.
- */
- public fun push(rate: Double)
-
- /**
- * Disconnect the consumer from its source.
- */
- public override fun close()
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
deleted file mode 100644
index a49826f4..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-
-/**
- * A consumer of a [FlowSource].
- */
-public interface FlowConsumer {
- /**
- * A flag to indicate that the consumer is currently consuming a [FlowSource].
- */
- public val isActive: Boolean
-
- /**
- * The flow capacity of this consumer.
- */
- public val capacity: Double
-
- /**
- * The current flow rate of the consumer.
- */
- public val rate: Double
-
- /**
- * The current flow demand.
- */
- public val demand: Double
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public val counters: FlowCounters
-
- /**
- * Start consuming the specified [source].
- *
- * @throws IllegalStateException if the consumer is already active.
- */
- public fun startConsumer(source: FlowSource)
-
- /**
- * Ask the consumer to pull its source.
- *
- * If the consumer is not active, this operation will be a no-op.
- */
- public fun pull()
-
- /**
- * Disconnect the consumer from its source.
- *
- * If the consumer is not active, this operation will be a no-op.
- */
- public fun cancel()
-}
-
-/**
- * Consume the specified [source] and suspend execution until the source is fully consumed or failed.
- */
-public suspend fun FlowConsumer.consume(source: FlowSource) {
- return suspendCancellableCoroutine { cont ->
- startConsumer(object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- try {
- source.onStart(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- try {
- source.onStop(conn, now)
-
- if (!cont.isCompleted) {
- cont.resume(Unit)
- }
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return try {
- source.onPull(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- source.onConverge(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun toString(): String = "SuspendingFlowSource"
- })
-
- cont.invokeOnCancellation { cancel() }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
deleted file mode 100644
index 98922ab3..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A controllable [FlowConnection].
- *
- * This interface is used by [FlowConsumer]s to control the connection between it and the source.
- */
-public interface FlowConsumerContext : FlowConnection {
- /**
- * The deadline of the source.
- */
- public val deadline: Long
-
- /**
- * The capacity of the connection.
- */
- public override var capacity: Double
-
- /**
- * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer.
- */
- public var shouldConsumerConverge: Boolean
-
- /**
- * A flag to control whether the timers for the [FlowSource] should be enabled.
- */
- public var enableTimers: Boolean
-
- /**
- * Start the flow over the connection.
- */
- public fun start()
-
- /**
- * Synchronously pull the source of the connection.
- *
- * @param now The timestamp at which the connection is pulled.
- */
- public fun pullSync(now: Long)
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
deleted file mode 100644
index 1d3adb10..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A collection of callbacks associated with a [FlowConsumer].
- */
-public interface FlowConsumerLogic {
- /**
- * This method is invoked when a [FlowSource] changes the rate of flow to this consumer.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param rate The requested processing rate of the source.
- */
- public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {}
-
- /**
- * This method is invoked when the flow graph has converged into a steady-state system.
- *
- * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method
- * will not be invoked.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the system converged.
- */
- public fun onConverge(ctx: FlowConsumerContext, now: Long) {}
-
- /**
- * This method is invoked when the [FlowSource] completed or failed.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param cause The cause of the failure or `null` if the source completed.
- */
- public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
deleted file mode 100644
index 62cb10d1..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A listener interface for when a flow stage has converged into a steady-state.
- */
-public interface FlowConvergenceListener {
- /**
- * This method is invoked when the system has converged to a steady-state.
- *
- * @param now The timestamp at which the system converged.
- */
- public fun onConverge(now: Long) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
deleted file mode 100644
index d8ad7978..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * An interface that tracks cumulative counts of the flow accumulation over a stage.
- */
-public interface FlowCounters {
- /**
- * The accumulated flow that a source wanted to push over the connection.
- */
- public val demand: Double
-
- /**
- * The accumulated flow that was actually transferred over the connection.
- */
- public val actual: Double
-
- /**
- * The amount of capacity that was not utilized.
- */
- public val remaining: Double
-
- /**
- * Reset the flow counters.
- */
- public fun reset()
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
deleted file mode 100644
index 65224827..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s.
- *
- * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
- * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
- */
-public interface FlowEngine {
- /**
- * The virtual [Clock] associated with this engine.
- */
- public val clock: Clock
-
- /**
- * Create a new [FlowConsumerContext] with the given [provider].
- *
- * @param consumer The consumer logic.
- * @param provider The logic of the resource provider.
- */
- public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext
-
- /**
- * Start batching the execution of resource updates until [popBatch] is called.
- *
- * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs
- * simultaneously) in a single state update.
- *
- * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the
- * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called
- * the same amount of times. To simplify batching, see [batch].
- */
- public fun pushBatch()
-
- /**
- * Stop the batching of resource updates and run the interpreter on the batch.
- *
- * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call.
- */
- public fun popBatch()
-
- public companion object {
- /**
- * Construct a new [FlowEngine] implementation.
- *
- * @param context The coroutine context to use.
- * @param clock The virtual simulation clock.
- */
- @JvmStatic
- @JvmName("create")
- public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine {
- return FlowEngineImpl(context, clock)
- }
- }
-}
-
-/**
- * Batch the execution of several interrupts into a single call.
- *
- * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
- */
-public inline fun FlowEngine.batch(block: () -> Unit) {
- try {
- pushBatch()
- block()
- } finally {
- popBatch()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
deleted file mode 100644
index 5202c252..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import mu.KotlinLogging
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-
-/**
- * The logging instance of this connection.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
- *
- * @param engine The [FlowEngine] the forwarder runs in.
- * @param listener The convergence lister to use.
- * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
- */
-public class FlowForwarder(
- private val engine: FlowEngine,
- private val listener: FlowConvergenceListener? = null,
- private val isCoupled: Boolean = false
-) : FlowSource, FlowConsumer, AutoCloseable {
- /**
- * The delegate [FlowSource].
- */
- private var delegate: FlowSource? = null
-
- /**
- * A flag to indicate that the delegate was started.
- */
- private var hasDelegateStarted: Boolean = false
-
- /**
- * The exposed [FlowConnection].
- */
- private val _ctx = object : FlowConnection {
- override var shouldSourceConverge: Boolean = false
- set(value) {
- field = value
- _innerCtx?.shouldSourceConverge = value
- }
-
- override val capacity: Double
- get() = _innerCtx?.capacity ?: 0.0
-
- override val demand: Double
- get() = _innerCtx?.demand ?: 0.0
-
- override val rate: Double
- get() = _innerCtx?.rate ?: 0.0
-
- override fun pull() {
- _innerCtx?.pull()
- }
-
- override fun pull(now: Long) {
- _innerCtx?.pull(now)
- }
-
- override fun push(rate: Double) {
- if (delegate == null) {
- return
- }
-
- _innerCtx?.push(rate)
- _demand = rate
- }
-
- override fun close() {
- val delegate = delegate ?: return
- val hasDelegateStarted = hasDelegateStarted
-
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
-
- if (hasDelegateStarted) {
- val now = engine.clock.millis()
- delegate.onStop(this, now)
- }
- }
- }
-
- /**
- * The [FlowConnection] in which the forwarder runs.
- */
- private var _innerCtx: FlowConnection? = null
-
- override val isActive: Boolean
- get() = delegate != null
-
- override val capacity: Double
- get() = _ctx.capacity
-
- override val rate: Double
- get() = _ctx.rate
-
- override val demand: Double
- get() = _ctx.demand
-
- override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- override fun startConsumer(source: FlowSource) {
- check(delegate == null) { "Forwarder already active" }
-
- delegate = source
-
- // Pull to replace the source
- pull()
- }
-
- override fun pull() {
- _ctx.pull()
- }
-
- override fun cancel() {
- _ctx.close()
- }
-
- override fun close() {
- val ctx = _innerCtx
-
- if (ctx != null) {
- this._innerCtx = null
- ctx.pull()
- }
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- _innerCtx = conn
-
- if (listener != null || _ctx.shouldSourceConverge) {
- conn.shouldSourceConverge = true
- }
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _innerCtx = null
-
- val delegate = delegate
- if (delegate != null) {
- reset()
-
- try {
- delegate.onStop(this._ctx, now)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
- }
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val delegate = delegate
-
- if (!hasDelegateStarted) {
- start()
- }
-
- updateCounters(conn, now)
-
- return try {
- delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
-
- reset()
- Long.MAX_VALUE
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- delegate?.onConverge(this._ctx, now)
- listener?.onConverge(now)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
-
- _innerCtx = null
- reset()
- }
- }
-
- /**
- * Start the delegate.
- */
- private fun start() {
- val delegate = delegate ?: return
-
- try {
- val now = engine.clock.millis()
- delegate.onStart(_ctx, now)
- hasDelegateStarted = true
- _lastUpdate = now
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
- reset()
- }
- }
-
- /**
- * Reset the delegate.
- */
- private fun reset() {
- if (isCoupled) {
- _innerCtx?.close()
- } else {
- _innerCtx?.push(0.0)
- }
-
- delegate = null
- hasDelegateStarted = false
- }
-
- /**
- * The requested flow rate.
- */
- private var _demand: Double = 0.0
- private var _lastUpdate = 0L
-
- /**
- * Update the flow counters for the transformer.
- */
- private fun updateCounters(ctx: FlowConnection, now: Long) {
- val lastUpdate = _lastUpdate
- _lastUpdate = now
- val delta = now - lastUpdate
- if (delta <= 0) {
- return
- }
-
- val counters = _counters
- val deltaS = delta * D_MS_TO_S
- val total = ctx.capacity * deltaS
- val work = _demand * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.increment(work, actualWork, (total - actualWork))
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
deleted file mode 100644
index af702701..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A [FlowConsumer] that maps the pushed flow through [transform].
- *
- * @param source The source of the flow.
- * @param transform The method to transform the flow.
- */
-public class FlowMapper(
- private val source: FlowSource,
- private val transform: (FlowConnection, Double) -> Double
-) : FlowSource {
-
- /**
- * The current active connection.
- */
- private var _conn: Connection? = null
-
- override fun onStart(conn: FlowConnection, now: Long) {
- check(_conn == null) { "Concurrent access" }
- val delegate = Connection(conn, transform)
- _conn = delegate
- source.onStart(delegate, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- val delegate = checkNotNull(_conn) { "Invariant violation" }
- _conn = null
- source.onStop(delegate, now)
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val delegate = checkNotNull(_conn) { "Invariant violation" }
- return source.onPull(delegate, now)
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- val delegate = _conn ?: return
- source.onConverge(delegate, now)
- }
-
- /**
- * The wrapper [FlowConnection] that is used to transform the flow.
- */
- private class Connection(
- private val delegate: FlowConnection,
- private val transform: (FlowConnection, Double) -> Double
- ) : FlowConnection by delegate {
- override fun push(rate: Double) {
- delegate.push(transform(this, rate))
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
deleted file mode 100644
index ee8cd739..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-
-/**
- * A [FlowSink] represents a sink with a fixed capacity.
- *
- * @param initialCapacity The initial capacity of the resource.
- * @param engine The engine that is used for driving the flow simulation.
- * @param parent The parent flow system.
- */
-public class FlowSink(
- private val engine: FlowEngine,
- initialCapacity: Double,
- private val parent: FlowConvergenceListener? = null
-) : FlowConsumer {
- /**
- * A flag to indicate that the flow consumer is active.
- */
- public override val isActive: Boolean
- get() = _ctx != null
-
- /**
- * The capacity of the consumer.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- _ctx?.capacity = value
- }
-
- /**
- * The current processing rate of the consumer.
- */
- public override val rate: Double
- get() = _ctx?.rate ?: 0.0
-
- /**
- * The flow processing rate demand at this instant.
- */
- public override val demand: Double
- get() = _ctx?.demand ?: 0.0
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- /**
- * The current active [FlowConsumerLogic] of this sink.
- */
- private var _ctx: FlowConsumerContext? = null
-
- override fun startConsumer(source: FlowSource) {
- check(_ctx == null) { "Consumer is in invalid state" }
-
- val ctx = engine.newContext(source, Logic(parent, _counters))
- _ctx = ctx
-
- ctx.capacity = capacity
- if (parent != null) {
- ctx.shouldConsumerConverge = true
- }
-
- ctx.start()
- }
-
- override fun pull() {
- _ctx?.pull()
- }
-
- override fun cancel() {
- _ctx?.close()
- }
-
- override fun toString(): String = "FlowSink[capacity=$capacity]"
-
- /**
- * [FlowConsumerLogic] of a sink.
- */
- private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
-
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- rate: Double
- ) {
- updateCounters(ctx, now, rate, ctx.capacity)
- }
-
- override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
- updateCounters(ctx, now, 0.0, 0.0)
-
- _ctx = null
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long) {
- parent?.onConverge(now)
- }
-
- /**
- * The previous demand and capacity for the consumer.
- */
- private val _previous = DoubleArray(2)
- private var _previousUpdate = Long.MAX_VALUE
-
- /**
- * Update the counters of the flow consumer.
- */
- private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) {
- val previousUpdate = _previousUpdate
- _previousUpdate = now
- val delta = now - previousUpdate
-
- val counters = counters
- val previous = _previous
- val demand = previous[0]
- val capacity = previous[1]
-
- previous[0] = nextDemand
- previous[1] = nextCapacity
-
- if (delta <= 0) {
- return
- }
-
- val deltaS = delta * D_MS_TO_S
- val total = demand * deltaS
- val work = capacity * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.increment(work, actualWork, (total - actualWork))
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
deleted file mode 100644
index a48ac18e..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A source of flow that is consumed by a [FlowConsumer].
- *
- * Implementations of this interface should be considered stateful and must be assumed not to be re-usable
- * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise.
- */
-public interface FlowSource {
- /**
- * This method is invoked when the source is started.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the provider finished.
- */
- public fun onStart(conn: FlowConnection, now: Long) {}
-
- /**
- * This method is invoked when the source is finished.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the source finished.
- */
- public fun onStop(conn: FlowConnection, now: Long) {}
-
- /**
- * This method is invoked when the source is pulled.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the pull is occurring.
- * @return The duration after which the resource consumer should be pulled again.
- */
- public fun onPull(conn: FlowConnection, now: Long): Long
-
- /**
- * This method is invoked when the flow graph has converged into a steady-state system.
- *
- * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method
- * will not be invoked.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the system converged.
- */
- public fun onConverge(conn: FlowConnection, now: Long) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
deleted file mode 100644
index 450195ec..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * Constant for converting milliseconds into seconds.
- */
-internal const val D_MS_TO_S = 1 / 1000.0
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
deleted file mode 100644
index 97d56fff..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * States of the flow connection.
- */
-internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source
-internal const val ConnActive = 1 // Connection is active and the source is currently being consumed
-internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore
-internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection
-
-/**
- * Flags of the flow connection
- */
-internal const val ConnPulled = 1 shl 2 // The source should be pulled
-internal const val ConnPushed = 1 shl 3 // The source has pushed a value
-internal const val ConnClose = 1 shl 4 // The connection should be closed
-internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active
-internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending
-internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
-internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
-internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
-internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
deleted file mode 100644
index fba3af5f..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import mu.KotlinLogging
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.batch
-import java.util.ArrayDeque
-import kotlin.math.min
-
-/**
- * The logging instance of this connection.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
- */
-internal class FlowConsumerContextImpl(
- private val engine: FlowEngineImpl,
- private val source: FlowSource,
- private val logic: FlowConsumerLogic
-) : FlowConsumerContext {
- /**
- * The capacity of the connection.
- */
- override var capacity: Double
- get() = _capacity
- set(value) {
- val oldValue = _capacity
-
- // Only changes will be propagated
- if (value != oldValue) {
- _capacity = value
- pull()
- }
- }
- private var _capacity: Double = 0.0
-
- /**
- * The current processing rate of the connection.
- */
- override val rate: Double
- get() = _rate
- private var _rate = 0.0
-
- /**
- * The current flow processing demand.
- */
- override val demand: Double
- get() = _demand
- private var _demand: Double = 0.0 // The current (pending) demand of the source
-
- /**
- * The deadline of the source.
- */
- override val deadline: Long
- get() = _deadline
- private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
-
- /**
- * Flags to control the convergence of the consumer and source.
- */
- override var shouldSourceConverge: Boolean
- get() = _flags and ConnConvergeSource == ConnConvergeSource
- set(value) {
- _flags =
- if (value) {
- _flags or ConnConvergeSource
- } else {
- _flags and ConnConvergeSource.inv()
- }
- }
- override var shouldConsumerConverge: Boolean
- get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer
- set(value) {
- _flags =
- if (value) {
- _flags or ConnConvergeConsumer
- } else {
- _flags and ConnConvergeConsumer.inv()
- }
- }
-
- /**
- * Flag to control the timers on the [FlowSource]
- */
- override var enableTimers: Boolean
- get() = _flags and ConnDisableTimers != ConnDisableTimers
- set(value) {
- _flags =
- if (!value) {
- _flags or ConnDisableTimers
- } else {
- _flags and ConnDisableTimers.inv()
- }
- }
-
- /**
- * The clock to track simulation time.
- */
- private val _clock = engine.clock
-
- /**
- * The flags of the flow connection, indicating certain actions.
- */
- private var _flags: Int = 0
-
- /**
- * The timers at which the context is scheduled to be interrupted.
- */
- private var _timer: Long = Long.MAX_VALUE
- private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5)
-
- override fun start() {
- check(_flags and ConnState == ConnPending) { "Consumer is already started" }
- engine.batch {
- val now = _clock.millis()
- source.onStart(this, now)
-
- // Mark the connection as active and pulled
- val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled
- scheduleImmediate(now, newFlags)
- }
- }
-
- override fun close() {
- val flags = _flags
- if (flags and ConnState == ConnClosed) {
- return
- }
-
- // Toggle the close bit. In case no update is active, schedule a new update.
- if (flags and ConnUpdateActive == 0) {
- val now = _clock.millis()
- scheduleImmediate(now, flags or ConnClose)
- } else {
- _flags = flags or ConnClose
- }
- }
-
- override fun pull(now: Long) {
- val flags = _flags
- if (flags and ConnState != ConnActive) {
- return
- }
-
- // Mark connection as pulled
- scheduleImmediate(now, flags or ConnPulled)
- }
-
- override fun pull() {
- pull(_clock.millis())
- }
-
- override fun pullSync(now: Long) {
- val flags = _flags
-
- // Do not attempt to flush the connection if the connection is closed or an update is already active
- if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) {
- return
- }
-
- if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
- engine.scheduleSync(now, this)
- }
- }
-
- override fun push(rate: Double) {
- if (_demand == rate) {
- return
- }
-
- _demand = rate
-
- val flags = _flags
-
- if (flags and ConnUpdateActive != 0) {
- // If an update is active, it will already get picked up at the end of the update
- _flags = flags or ConnPushed
- } else {
- // Invalidate only if no update is active
- scheduleImmediate(_clock.millis(), flags or ConnPushed)
- }
- }
-
- /**
- * Update the state of the flow connection.
- *
- * @param now The current virtual timestamp.
- * @param visited The queue of connections that have been visited during the cycle.
- * @param timerQueue The queue of all pending timers.
- * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update.
- */
- fun doUpdate(
- now: Long,
- visited: FlowDeque,
- timerQueue: FlowTimerQueue,
- isImmediate: Boolean
- ) {
- var flags = _flags
-
- // Precondition: The flow connection must be active
- if (flags and ConnState != ConnActive) {
- return
- }
-
- val deadline = _deadline
- val reachedDeadline = deadline == now
- var newDeadline: Long
- var hasUpdated = false
-
- try {
- // Pull the source if (1) `pull` is called or (2) the timer of the source has expired
- newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
- // Update state before calling into the outside world, so it observes a consistent state
- _flags = (flags and ConnPulled.inv()) or ConnUpdateActive
- hasUpdated = true
-
- val duration = source.onPull(this, now)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = _flags
-
- if (duration != Long.MAX_VALUE) {
- now + duration
- } else {
- duration
- }
- } else {
- deadline
- }
-
- // Make the new deadline available for the consumer if it has changed
- if (newDeadline != deadline) {
- _deadline = newDeadline
- }
-
- // Push to the consumer if the rate of the source has changed (after a call to `push`)
- if (flags and ConnPushed != 0) {
- // Update state before calling into the outside world, so it observes a consistent state
- _flags = (flags and ConnPushed.inv()) or ConnUpdateActive
- hasUpdated = true
-
- logic.onPush(this, now, _demand)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = _flags
- }
-
- // Check whether the source or consumer have tried to close the connection
- if (flags and ConnClose != 0) {
- hasUpdated = true
-
- // The source has called [FlowConnection.close], so clean up the connection
- doStopSource(now)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- // We now also mark the connection as closed
- flags = (_flags and ConnState.inv()) or ConnClosed
-
- _demand = 0.0
- newDeadline = Long.MAX_VALUE
- }
- } catch (cause: Throwable) {
- hasUpdated = true
-
- // Clean up the connection
- doFailSource(now, cause)
-
- // Mark the connection as closed
- flags = (flags and ConnState.inv()) or ConnClosed
-
- _demand = 0.0
- newDeadline = Long.MAX_VALUE
- }
-
- // Check whether the connection needs to be added to the visited queue. This is the case when:
- // (1) An update was performed (either a push or a pull)
- // (2) Either the source or consumer want to converge, and
- // (3) Convergence is not already pending (ConnConvergePending)
- if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) {
- visited.add(this)
- flags = flags or ConnConvergePending
- }
-
- // Compute the new flow rate of the connection
- // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value
- _rate = min(_capacity, _demand)
-
- // Indicate that no update is active anymore and flush the flags
- _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv()
-
- val pendingTimers = _pendingTimers
-
- // Prune the head timer if this is a delayed update
- val timer = if (!isImmediate) {
- // Invariant: Any pending timer should only point to a future timestamp
- val timer = pendingTimers.poll() ?: Long.MAX_VALUE
- _timer = timer
- timer
- } else {
- _timer
- }
-
- // Check whether we need to schedule a new timer for this connection. That is the case when:
- // (1) The deadline is valid (not the maximum value)
- // (2) The connection is active
- // (3) Timers are not disabled for the source
- // (4) The current active timer for the connection points to a later deadline
- if (newDeadline == Long.MAX_VALUE ||
- flags and ConnState != ConnActive ||
- flags and ConnDisableTimers != 0 ||
- (timer != Long.MAX_VALUE && newDeadline >= timer)
- ) {
- // Ignore any deadline scheduled at the maximum value
- // This indicates that the source does not want to register a timer
- return
- }
-
- // Construct a timer for the new deadline and add it to the global queue of timers
- _timer = newDeadline
- timerQueue.add(this, newDeadline)
-
- // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers
- if (timer != Long.MAX_VALUE) {
- pendingTimers.addFirst(timer)
- }
- }
-
- /**
- * This method is invoked when the system converges into a steady state.
- */
- fun onConverge(now: Long) {
- try {
- val flags = _flags
-
- // The connection is converging now, so unset the convergence pending flag
- _flags = flags and ConnConvergePending.inv()
-
- // Call the source converge callback if it has enabled convergence
- if (flags and ConnConvergeSource != 0) {
- source.onConverge(this, now)
- }
-
- // Call the consumer callback if it has enabled convergence
- if (flags and ConnConvergeConsumer != 0) {
- logic.onConverge(this, now)
- }
- } catch (cause: Throwable) {
- // Invoke the finish callbacks
- doFailSource(now, cause)
-
- // Mark the connection as closed
- _flags = (_flags and ConnState.inv()) or ConnClosed
- _demand = 0.0
- _deadline = Long.MAX_VALUE
- }
- }
-
- override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
-
- /**
- * Stop the [FlowSource].
- */
- private fun doStopSource(now: Long) {
- try {
- source.onStop(this, now)
- doFinishConsumer(now, null)
- } catch (cause: Throwable) {
- doFinishConsumer(now, cause)
- }
- }
-
- /**
- * Fail the [FlowSource].
- */
- private fun doFailSource(now: Long, cause: Throwable) {
- try {
- source.onStop(this, now)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- doFinishConsumer(now, e)
- }
- }
-
- /**
- * Finish the consumer.
- */
- private fun doFinishConsumer(now: Long, cause: Throwable?) {
- try {
- logic.onFinish(this, now, cause)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- logger.error(e) { "Uncaught exception" }
- }
- }
-
- /**
- * Schedule an immediate update for this connection.
- */
- private fun scheduleImmediate(now: Long, flags: Int) {
- // In case an immediate update is already scheduled, no need to do anything
- if (flags and ConnUpdatePending != 0) {
- _flags = flags
- return
- }
-
- // Mark the connection that there is an update pending
- _flags = flags or ConnUpdatePending
-
- engine.scheduleImmediate(now, this)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
deleted file mode 100644
index 403a9aec..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import java.util.ArrayDeque
-
-/**
- * A specialized [ArrayDeque] that tracks the [FlowConsumerContextImpl] instances that have updated in an interpreter
- * cycle.
- *
- * By using a specialized class, we reduce the overhead caused by type-erasure.
- */
-internal class FlowDeque(initialCapacity: Int = 256) {
- /**
- * The array of elements in the queue.
- */
- private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity)
- private var _head = 0
- private var _tail = 0
-
- /**
- * Determine whether this queue is not empty.
- */
- fun isNotEmpty(): Boolean {
- return _head != _tail
- }
-
- /**
- * Add the specified [ctx] to the queue.
- */
- fun add(ctx: FlowConsumerContextImpl) {
- val es = _elements
- var tail = _tail
-
- es[tail] = ctx
-
- tail = inc(tail, es.size)
- _tail = tail
-
- if (_head == tail) {
- doubleCapacity()
- }
- }
-
- /**
- * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty.
- */
- fun poll(): FlowConsumerContextImpl? {
- val es = _elements
- val head = _head
- val ctx = es[head]
-
- if (ctx != null) {
- es[head] = null
- _head = inc(head, es.size)
- }
-
- return ctx
- }
-
- /**
- * Clear the queue.
- */
- fun clear() {
- _elements.fill(null)
- _head = 0
- _tail = 0
- }
-
- private fun inc(i: Int, modulus: Int): Int {
- var x = i
- if (++x >= modulus) {
- x = 0
- }
- return x
- }
-
- /**
- * Doubles the capacity of this deque
- */
- private fun doubleCapacity() {
- assert(_head == _tail)
- val p = _head
- val n = _elements.size
- val r = n - p // number of elements to the right of p
-
- val newCapacity = n shl 1
- check(newCapacity >= 0) { "Sorry, deque too big" }
-
- val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity)
-
- _elements.copyInto(a, 0, p, n)
- _elements.copyInto(a, r, 0, p)
-
- _elements = a
- _head = 0
- _tail = n
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
deleted file mode 100644
index 6fd1ef31..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Runnable
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSource
-import java.time.Clock
-import java.util.ArrayDeque
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Internal implementation of the [FlowEngine] interface.
- *
- * @param context The coroutine context to use.
- * @param clock The virtual simulation clock.
- */
-internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable {
- /**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
- */
- @OptIn(InternalCoroutinesApi::class)
- private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
-
- /**
- * The queue of connection updates that are scheduled for immediate execution.
- */
- private val queue = FlowDeque()
-
- /**
- * A priority queue containing the connection updates to be scheduled in the future.
- */
- private val futureQueue = FlowTimerQueue()
-
- /**
- * The stack of engine invocations to occur in the future.
- */
- private val futureInvocations = ArrayDeque<Invocation>()
-
- /**
- * The systems that have been visited during the engine cycle.
- */
- private val visited = FlowDeque()
-
- /**
- * The index in the batch stack.
- */
- private var batchIndex = 0
-
- /**
- * The virtual [Clock] of this engine.
- */
- override val clock: Clock
- get() = _clock
- private val _clock: Clock = clock
-
- /**
- * Update the specified [ctx] synchronously.
- */
- fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
-
- // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
- // up by the active engine.
- if (batchIndex > 0) {
- return
- }
-
- doRunEngine(now)
- }
-
- /**
- * Enqueue the specified [ctx] to be updated immediately during the active engine cycle.
- *
- * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
- * re-computed. In case no engine is currently active, the engine will be started.
- */
- fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) {
- queue.add(ctx)
-
- // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
- // up by the active engine.
- if (batchIndex > 0) {
- return
- }
-
- doRunEngine(now)
- }
-
- override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
-
- override fun pushBatch() {
- batchIndex++
- }
-
- override fun popBatch() {
- try {
- // Flush the work if the engine is not already running
- if (batchIndex == 1 && queue.isNotEmpty()) {
- doRunEngine(_clock.millis())
- }
- } finally {
- batchIndex--
- }
- }
-
- /* Runnable */
- override fun run() {
- val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
- doRunEngine(invocation.timestamp)
- }
-
- /**
- * Run all the enqueued actions for the specified [timestamp][now].
- */
- private fun doRunEngine(now: Long) {
- val queue = queue
- val futureQueue = futureQueue
- val futureInvocations = futureInvocations
- val visited = visited
-
- try {
- // Increment batch index so synchronous calls will not launch concurrent engine invocations
- batchIndex++
-
- // Execute all scheduled updates at current timestamp
- while (true) {
- val ctx = futureQueue.poll(now) ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
- }
-
- // Repeat execution of all immediate updates until the system has converged to a steady-state
- // We have to take into account that the onConverge callback can also trigger new actions.
- do {
- // Execute all immediate updates
- while (true) {
- val ctx = queue.poll() ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
- }
-
- while (true) {
- val ctx = visited.poll() ?: break
- ctx.onConverge(now)
- }
- } while (queue.isNotEmpty())
- } finally {
- // Decrement batch index to indicate no engine is active at the moment
- batchIndex--
- }
-
- // Schedule an engine invocation for the next update to occur.
- val headDeadline = futureQueue.peekDeadline()
- if (headDeadline != Long.MAX_VALUE) {
- trySchedule(now, futureInvocations, headDeadline)
- }
- }
-
- /**
- * Try to schedule an engine invocation at the specified [target].
- *
- * @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the engine invocation should happen.
- * @param scheduled The queue of scheduled invocations.
- */
- private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- val head = scheduled.peek()
-
- // Only schedule a new scheduler invocation in case the target is earlier than all other pending
- // scheduler invocations
- if (head == null || target < head.timestamp) {
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, this, context)
- scheduled.addFirst(Invocation(target, handle))
- }
- }
-
- /**
- * A future engine invocation.
- *
- * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
- * the invocation is not needed anymore, it can be cancelled via [cancel].
- */
- private class Invocation(
- @JvmField val timestamp: Long,
- @JvmField val handle: DisposableHandle
- ) {
- /**
- * Cancel the engine invocation.
- */
- fun cancel() = handle.dispose()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
deleted file mode 100644
index 47061a91..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * Specialized priority queue for flow timers.
- *
- * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
- * being generic.
- */
-internal class FlowTimerQueue(initialCapacity: Int = 256) {
- /**
- * The binary heap of deadlines.
- */
- private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE }
-
- /**
- * The binary heap of [FlowConsumerContextImpl]s.
- */
- private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity)
-
- /**
- * The number of elements in the priority queue.
- */
- private var size = 0
-
- /**
- * Register a timer for [ctx] with [deadline].
- */
- fun add(ctx: FlowConsumerContextImpl, deadline: Long) {
- val i = size
- var deadlines = _deadlines
- if (i >= deadlines.size) {
- grow()
- // Re-fetch the resized array
- deadlines = _deadlines
- }
-
- siftUp(deadlines, _pending, i, ctx, deadline)
-
- size = i + 1
- }
-
- /**
- * Update all pending [FlowConsumerContextImpl]s at the timestamp [now].
- */
- fun poll(now: Long): FlowConsumerContextImpl? {
- if (size == 0) {
- return null
- }
-
- val deadlines = _deadlines
- val deadline = deadlines[0]
-
- if (now < deadline) {
- return null
- }
-
- val pending = _pending
- val res = pending[0]
- val s = --size
-
- val nextDeadline = deadlines[s]
- val next = pending[s]!!
-
- // Clear the last element of the queue
- pending[s] = null
- deadlines[s] = Long.MIN_VALUE
-
- if (s != 0) {
- siftDown(deadlines, pending, next, nextDeadline)
- }
-
- return res
- }
-
- /**
- * Find the earliest deadline in the queue.
- */
- fun peekDeadline(): Long {
- return if (size == 0) Long.MAX_VALUE else _deadlines[0]
- }
-
- /**
- * Increases the capacity of the array.
- */
- private fun grow() {
- val oldCapacity = _deadlines.size
- // Double size if small; else grow by 50%
- val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1
-
- _deadlines = _deadlines.copyOf(newCapacity)
- _pending = _pending.copyOf(newCapacity)
- }
-
- /**
- * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is
- * greater than or equal to its parent, or is the root.
- *
- * @param deadlines The heap of deadlines.
- * @param pending The heap of contexts.
- * @param pos The position to fill.
- * @param ctx The [FlowConsumerContextImpl] to insert.
- * @param deadline The deadline of the context.
- */
- private fun siftUp(
- deadlines: LongArray,
- pending: Array<FlowConsumerContextImpl?>,
- pos: Int,
- ctx: FlowConsumerContextImpl,
- deadline: Long
- ) {
- var k = pos
-
- while (k > 0) {
- val parent = (k - 1) ushr 1
- val parentDeadline = deadlines[parent]
-
- if (deadline >= parentDeadline) {
- break
- }
-
- deadlines[k] = parentDeadline
- pending[k] = pending[parent]
-
- k = parent
- }
-
- deadlines[k] = deadline
- pending[k] = ctx
- }
-
- /**
- * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it
- * is less than or equal to its children or is a leaf.
- *
- * @param deadlines The heap of deadlines.
- * @param pending The heap of contexts.
- * @param ctx The [FlowConsumerContextImpl] to insert.
- * @param deadline The deadline of the context.
- */
- private fun siftDown(
- deadlines: LongArray,
- pending: Array<FlowConsumerContextImpl?>,
- ctx: FlowConsumerContextImpl,
- deadline: Long
- ) {
- var k = 0
- val size = size
- val half = size ushr 1
-
- while (k < half) {
- var child = (k shl 1) + 1
-
- var childDeadline = deadlines[child]
- val right = child + 1
-
- if (right < size) {
- val rightDeadline = deadlines[right]
-
- if (childDeadline > rightDeadline) {
- child = right
- childDeadline = rightDeadline
- }
- }
-
- if (deadline <= childDeadline) {
- break
- }
-
- deadlines[k] = childDeadline
- pending[k] = pending[child]
-
- k = child
- }
-
- deadlines[k] = deadline
- pending[k] = ctx
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
deleted file mode 100644
index c320a362..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import org.opendc.simulator.flow.FlowCounters
-
-/**
- * Mutable implementation of the [FlowCounters] interface.
- */
-public class MutableFlowCounters : FlowCounters {
- override val demand: Double
- get() = _counters[0]
- override val actual: Double
- get() = _counters[1]
- override val remaining: Double
- get() = _counters[2]
- private val _counters = DoubleArray(3)
-
- override fun reset() {
- _counters.fill(0.0)
- }
-
- public fun increment(demand: Double, actual: Double, remaining: Double) {
- val counters = _counters
- counters[0] += demand
- counters[1] += actual
- counters[2] += remaining
- }
-
- override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
deleted file mode 100644
index 8752c559..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
- */
-public interface FlowMultiplexer {
- /**
- * The maximum number of inputs supported by the multiplexer.
- */
- public val maxInputs: Int
-
- /**
- * The maximum number of outputs supported by the multiplexer.
- */
- public val maxOutputs: Int
-
- /**
- * The inputs of the multiplexer that can be used to consume sources.
- */
- public val inputs: Set<FlowConsumer>
-
- /**
- * The outputs of the multiplexer over which the flows will be distributed.
- */
- public val outputs: Set<FlowSource>
-
- /**
- * The actual processing rate of the multiplexer.
- */
- public val rate: Double
-
- /**
- * The demanded processing rate of the input.
- */
- public val demand: Double
-
- /**
- * The capacity of the outputs.
- */
- public val capacity: Double
-
- /**
- * The flow counters to track the flow metrics of all multiplexer inputs.
- */
- public val counters: FlowCounters
-
- /**
- * Create a new input on this multiplexer with a coupled capacity.
- */
- public fun newInput(): FlowConsumer
-
- /**
- * Create a new input on this multiplexer with the specified [capacity].
- *
- * @param capacity The capacity of the input.
- */
- public fun newInput(capacity: Double): FlowConsumer
-
- /**
- * Remove [input] from this multiplexer.
- */
- public fun removeInput(input: FlowConsumer)
-
- /**
- * Create a new output on this multiplexer.
- */
- public fun newOutput(): FlowSource
-
- /**
- * Remove [output] from this multiplexer.
- */
- public fun removeOutput(output: FlowSource)
-
- /**
- * Clear all inputs and outputs from the multiplexer.
- */
- public fun clear()
-
- /**
- * Clear the inputs of the multiplexer.
- */
- public fun clearInputs()
-
- /**
- * Clear the outputs of the multiplexer.
- */
- public fun clearOutputs()
-
- /**
- * Flush the counters of the multiplexer.
- */
- public fun flushCounters()
-
- /**
- * Flush the counters of the specified [input].
- */
- public fun flushCounters(input: FlowConsumer)
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
deleted file mode 100644
index a863e3ad..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-
-/**
- * Factory interface for a [FlowMultiplexer] implementation.
- */
-public fun interface FlowMultiplexerFactory {
- /**
- * Construct a new [FlowMultiplexer] using the specified [engine] and [listener].
- */
- public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer
-
- public companion object {
- /**
- * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer].
- */
- private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) }
-
- /**
- * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer].
- */
- private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) }
-
- /**
- * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances.
- */
- @JvmStatic
- public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY
-
- /**
- * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances.
- */
- @JvmStatic
- public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
deleted file mode 100644
index 53f94a94..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowForwarder
-import org.opendc.simulator.flow.FlowSource
-import java.util.ArrayDeque
-
-/**
- * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
- * that a single input is directly connected to an output and that the multiplexer can only support as many
- * inputs as outputs.
- *
- * @param engine The [FlowEngine] driving the simulation.
- * @param listener The convergence listener of the multiplexer.
- */
-public class ForwardingFlowMultiplexer(
- private val engine: FlowEngine,
- private val listener: FlowConvergenceListener? = null
-) : FlowMultiplexer, FlowConvergenceListener {
-
- override val maxInputs: Int
- get() = _outputs.size
-
- override val maxOutputs: Int = Int.MAX_VALUE
-
- override val inputs: Set<FlowConsumer>
- get() = _inputs
- private val _inputs = mutableSetOf<Input>()
-
- override val outputs: Set<FlowSource>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
- private val _availableOutputs = ArrayDeque<Output>()
-
- override val counters: FlowCounters = object : FlowCounters {
- override val demand: Double
- get() = _outputs.sumOf { it.forwarder.counters.demand }
- override val actual: Double
- get() = _outputs.sumOf { it.forwarder.counters.actual }
- override val remaining: Double
- get() = _outputs.sumOf { it.forwarder.counters.remaining }
-
- override fun reset() {
- for (output in _outputs) {
- output.forwarder.counters.reset()
- }
- }
-
- override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
- }
-
- override val rate: Double
- get() = _outputs.sumOf { it.forwarder.rate }
-
- override val demand: Double
- get() = _outputs.sumOf { it.forwarder.demand }
-
- override val capacity: Double
- get() = _outputs.sumOf { it.forwarder.capacity }
-
- override fun newInput(): FlowConsumer {
- val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
- val input = Input(output)
- _inputs += input
- return input
- }
-
- override fun newInput(capacity: Double): FlowConsumer = newInput()
-
- override fun removeInput(input: FlowConsumer) {
- if (!_inputs.remove(input)) {
- return
- }
-
- val output = (input as Input).output
- output.forwarder.cancel()
- _availableOutputs += output
- }
-
- override fun newOutput(): FlowSource {
- val forwarder = FlowForwarder(engine, this)
- val output = Output(forwarder)
-
- _outputs += output
- return output
- }
-
- override fun removeOutput(output: FlowSource) {
- if (!_outputs.remove(output)) {
- return
- }
-
- val forwarder = (output as Output).forwarder
- forwarder.close()
- }
-
- override fun clearInputs() {
- for (input in _inputs) {
- val output = input.output
- output.forwarder.cancel()
- _availableOutputs += output
- }
-
- _inputs.clear()
- }
-
- override fun clearOutputs() {
- for (output in _outputs) {
- output.forwarder.cancel()
- }
- _outputs.clear()
- _availableOutputs.clear()
- }
-
- override fun clear() {
- clearOutputs()
- clearInputs()
- }
-
- override fun flushCounters() {}
-
- override fun flushCounters(input: FlowConsumer) {}
-
- override fun onConverge(now: Long) {
- listener?.onConverge(now)
- }
-
- /**
- * An input on the multiplexer.
- */
- private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
- override fun toString(): String = "ForwardingFlowMultiplexer.Input"
- }
-
- /**
- * An output on the multiplexer.
- */
- private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
- override fun onStart(conn: FlowConnection, now: Long) {
- _availableOutputs += this
- forwarder.onStart(conn, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- forwarder.cancel()
- forwarder.onStop(conn, now)
- }
-
- override fun toString(): String = "ForwardingFlowMultiplexer.Output"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
deleted file mode 100644
index d9c6f893..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ /dev/null
@@ -1,811 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-import kotlin.math.min
-
-/**
- * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing.
- *
- * @param engine The [FlowEngine] to drive the flow simulation.
- * @param parent The parent flow system of the multiplexer.
- */
-public class MaxMinFlowMultiplexer(
- private val engine: FlowEngine,
- parent: FlowConvergenceListener? = null
-) : FlowMultiplexer {
-
- override val maxInputs: Int = Int.MAX_VALUE
-
- override val maxOutputs: Int = Int.MAX_VALUE
-
- /**
- * The inputs of the multiplexer.
- */
- override val inputs: Set<FlowConsumer>
- get() = _inputs
- private val _inputs = mutableSetOf<Input>()
-
- /**
- * The outputs of the multiplexer.
- */
- override val outputs: Set<FlowSource>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
-
- /**
- * The flow counters of this multiplexer.
- */
- public override val counters: FlowCounters
- get() = scheduler.counters
-
- /**
- * The actual processing rate of the multiplexer.
- */
- public override val rate: Double
- get() = scheduler.rate
-
- /**
- * The demanded processing rate of the input.
- */
- public override val demand: Double
- get() = scheduler.demand
-
- /**
- * The capacity of the outputs.
- */
- public override val capacity: Double
- get() = scheduler.capacity
-
- /**
- * The [Scheduler] instance of this multiplexer.
- */
- private val scheduler = Scheduler(engine, parent)
-
- override fun newInput(): FlowConsumer {
- return newInput(isCoupled = true, Double.POSITIVE_INFINITY)
- }
-
- override fun newInput(capacity: Double): FlowConsumer {
- return newInput(isCoupled = false, capacity)
- }
-
- private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer {
- val provider = Input(engine, scheduler, isCoupled, initialCapacity)
- _inputs.add(provider)
- return provider
- }
-
- override fun removeInput(input: FlowConsumer) {
- if (!_inputs.remove(input)) {
- return
- }
- // This cast should always succeed since only `Input` instances should be added to `_inputs`
- (input as Input).close()
- }
-
- override fun newOutput(): FlowSource {
- val output = Output(scheduler)
- _outputs.add(output)
- return output
- }
-
- override fun removeOutput(output: FlowSource) {
- if (!_outputs.remove(output)) {
- return
- }
-
- // This cast should always succeed since only `Output` instances should be added to `_outputs`
- (output as Output).cancel()
- }
-
- override fun clearInputs() {
- for (input in _inputs) {
- input.cancel()
- }
- _inputs.clear()
- }
-
- override fun clearOutputs() {
- for (output in _outputs) {
- output.cancel()
- }
- _outputs.clear()
- }
-
- override fun clear() {
- clearOutputs()
- clearInputs()
- }
-
- override fun flushCounters() {
- scheduler.updateCounters(engine.clock.millis())
- }
-
- override fun flushCounters(input: FlowConsumer) {
- (input as Input).doUpdateCounters(engine.clock.millis())
- }
-
- /**
- * Helper class containing the scheduler state.
- */
- private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) {
- /**
- * The flow counters of this scheduler.
- */
- @JvmField val counters = MutableFlowCounters()
-
- /**
- * The flow rate of the multiplexer.
- */
- @JvmField var rate = 0.0
-
- /**
- * The demand for the multiplexer.
- */
- @JvmField var demand = 0.0
-
- /**
- * The capacity of the multiplexer.
- */
- @JvmField var capacity = 0.0
-
- /**
- * An [Output] that is used to activate the scheduler.
- */
- @JvmField var activationOutput: Output? = null
-
- /**
- * The active inputs registered with the scheduler.
- */
- private val _activeInputs = mutableListOf<Input>()
-
- /**
- * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList].
- */
- private var _inputArray = emptyArray<Input>()
-
- /**
- * The active outputs registered with the scheduler.
- */
- private val _activeOutputs = mutableListOf<Output>()
-
- /**
- * Flag to indicate that the scheduler is active.
- */
- private var _schedulerActive = false
-
- /**
- * The last convergence timestamp and the input.
- */
- private var _lastConverge: Long = Long.MIN_VALUE
- private var _lastConvergeInput: Input? = null
-
- /**
- * The simulation clock.
- */
- private val _clock = engine.clock
-
- /**
- * Register the specified [input] to this scheduler.
- */
- fun registerInput(input: Input) {
- _activeInputs.add(input)
- _inputArray = _activeInputs.toTypedArray()
-
- val hasActivationOutput = activationOutput != null
-
- // Disable timers and convergence of the source if one of the output manages it
- input.shouldConsumerConverge = !hasActivationOutput
- input.enableTimers = !hasActivationOutput
-
- if (input.isCoupled) {
- input.capacity = capacity
- }
-
- trigger(_clock.millis())
- }
-
- /**
- * De-register the specified [input] from this scheduler.
- */
- fun deregisterInput(input: Input, now: Long) {
- // Assign a new input responsible for handling the convergence events
- if (_lastConvergeInput == input) {
- _lastConvergeInput = null
- }
-
- _activeInputs.remove(input)
-
- // Re-run scheduler to distribute new load
- trigger(now)
- }
-
- /**
- * This method is invoked when one of the inputs converges.
- */
- fun convergeInput(input: Input, now: Long) {
- val lastConverge = _lastConverge
- val lastConvergeInput = _lastConvergeInput
- val parent = parent
-
- if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) {
- _lastConverge = now
- _lastConvergeInput = input
-
- parent.onConverge(now)
- }
- }
-
- /**
- * Register the specified [output] to this scheduler.
- */
- fun registerOutput(output: Output) {
- _activeOutputs.add(output)
-
- updateCapacity()
- updateActivationOutput()
- }
-
- /**
- * De-register the specified [output] from this scheduler.
- */
- fun deregisterOutput(output: Output, now: Long) {
- _activeOutputs.remove(output)
- updateCapacity()
-
- trigger(now)
- }
-
- /**
- * This method is invoked when one of the outputs converges.
- */
- fun convergeOutput(output: Output, now: Long) {
- val parent = parent
-
- if (parent != null) {
- _lastConverge = now
- parent.onConverge(now)
- }
-
- if (!output.isActive) {
- output.isActivationOutput = false
- updateActivationOutput()
- }
- }
-
- /**
- * Trigger the scheduler of the multiplexer.
- *
- * @param now The current virtual timestamp of the simulation.
- */
- fun trigger(now: Long) {
- if (_schedulerActive) {
- // No need to trigger the scheduler in case it is already active
- return
- }
-
- val activationOutput = activationOutput
-
- // We can run the scheduler in two ways:
- // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
- // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
- // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
- // a few inputs and little changes at the same timestamp.
- // We always pick for option (1) unless there are no outputs available.
- if (activationOutput != null) {
- activationOutput.pull(now)
- return
- } else {
- runScheduler(now)
- }
- }
-
- /**
- * Synchronously run the scheduler of the multiplexer.
- */
- fun runScheduler(now: Long): Long {
- return try {
- _schedulerActive = true
- doRunScheduler(now)
- } finally {
- _schedulerActive = false
- }
- }
-
- /**
- * Recompute the capacity of the multiplexer.
- */
- fun updateCapacity() {
- val newCapacity = _activeOutputs.sumOf(Output::capacity)
-
- // No-op if the capacity is unchanged
- if (capacity == newCapacity) {
- return
- }
-
- capacity = newCapacity
-
- for (input in _activeInputs) {
- if (input.isCoupled) {
- input.capacity = newCapacity
- }
- }
-
- // Sort outputs by their capacity
- _activeOutputs.sort()
- }
-
- /**
- * Updates the output that is used for scheduler activation.
- */
- private fun updateActivationOutput() {
- val output = _activeOutputs.firstOrNull()
- activationOutput = output
-
- if (output != null) {
- output.isActivationOutput = true
- }
-
- val hasActivationOutput = output != null
-
- for (input in _activeInputs) {
- input.shouldConsumerConverge = !hasActivationOutput
- input.enableTimers = !hasActivationOutput
- }
- }
-
- /**
- * Schedule the inputs over the outputs.
- *
- * @return The deadline after which a new scheduling cycle should start.
- */
- private fun doRunScheduler(now: Long): Long {
- val activeInputs = _activeInputs
- val activeOutputs = _activeOutputs
- var inputArray = _inputArray
- var inputSize = _inputArray.size
-
- // Update the counters of the scheduler
- updateCounters(now)
-
- // If there is no work yet, mark the inputs as idle.
- if (inputSize == 0) {
- demand = 0.0
- rate = 0.0
- return Long.MAX_VALUE
- }
-
- val capacity = capacity
- var availableCapacity = capacity
- var deadline = Long.MAX_VALUE
- var demand = 0.0
- var shouldRebuild = false
-
- // Pull in the work of the inputs
- for (i in 0 until inputSize) {
- val input = inputArray[i]
-
- input.pullSync(now)
-
- // Remove inputs that have finished
- if (!input.isActive) {
- input.actualRate = 0.0
- shouldRebuild = true
- } else {
- demand += input.limit
- deadline = min(deadline, input.deadline)
- }
- }
-
- // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs`
- if (shouldRebuild) {
- inputArray = activeInputs.toTypedArray()
- inputSize = inputArray.size
- _inputArray = inputArray
- }
-
- val rate = if (demand > capacity) {
- // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
- // constrained capacity across the inputs.
-
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- inputArray.sort()
-
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- for (i in 0 until inputSize) {
- val input = inputArray[i]
- val availableShare = availableCapacity / (inputSize - i)
- val grantedRate = min(input.allowedRate, availableShare)
-
- availableCapacity -= grantedRate
- input.actualRate = grantedRate
- }
-
- capacity - availableCapacity
- } else {
- demand
- }
-
- this.demand = demand
- if (this.rate != rate) {
- // Only update the outputs if the output rate has changed
- this.rate = rate
-
- // Divide the requests over the available capacity of the input resources fairly
- for (i in activeOutputs.indices) {
- val output = activeOutputs[i]
- val inputCapacity = output.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
-
- output.push(grantedSpeed)
- }
- }
-
- return deadline
- }
-
- /**
- * The previous capacity of the multiplexer.
- */
- private var _previousCapacity = 0.0
- private var _previousUpdate = Long.MIN_VALUE
-
- /**
- * Update the counters of the scheduler.
- */
- fun updateCounters(now: Long) {
- val previousCapacity = _previousCapacity
- _previousCapacity = capacity
-
- val previousUpdate = _previousUpdate
- _previousUpdate = now
-
- val delta = now - previousUpdate
- if (delta <= 0) {
- return
- }
-
- val deltaS = delta * D_MS_TO_S
- val demand = demand
- val rate = rate
-
- counters.increment(
- demand = demand * deltaS,
- actual = rate * deltaS,
- remaining = (previousCapacity - rate) * deltaS
- )
- }
- }
-
- /**
- * An internal [FlowConsumer] implementation for multiplexer inputs.
- */
- private class Input(
- private val engine: FlowEngine,
- private val scheduler: Scheduler,
- @JvmField val isCoupled: Boolean,
- initialCapacity: Double
- ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
- /**
- * A flag to indicate that the consumer is active.
- */
- override val isActive: Boolean
- get() = _ctx != null
-
- /**
- * The demand of the consumer.
- */
- override val demand: Double
- get() = limit
-
- /**
- * The processing rate of the consumer.
- */
- override val rate: Double
- get() = actualRate
-
- /**
- * The capacity of the input.
- */
- override var capacity: Double
- get() = _capacity
- set(value) {
- allowedRate = min(limit, value)
- _capacity = value
- _ctx?.capacity = value
- }
- private var _capacity = initialCapacity
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- /**
- * A flag to enable timers for the input.
- */
- var enableTimers: Boolean = true
- set(value) {
- field = value
- _ctx?.enableTimers = value
- }
-
- /**
- * A flag to control whether the input should converge.
- */
- var shouldConsumerConverge: Boolean = true
- set(value) {
- field = value
- _ctx?.shouldConsumerConverge = value
- }
-
- /**
- * The requested limit.
- */
- @JvmField var limit: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- @JvmField var actualRate: Double = 0.0
-
- /**
- * The processing rate that is allowed by the model constraints.
- */
- @JvmField var allowedRate: Double = 0.0
-
- /**
- * The deadline of the input.
- */
- val deadline: Long
- get() = _ctx?.deadline ?: Long.MAX_VALUE
-
- /**
- * The [FlowConsumerContext] that is currently running.
- */
- private var _ctx: FlowConsumerContext? = null
-
- /**
- * A flag to indicate that the input is closed.
- */
- private var _isClosed: Boolean = false
-
- /**
- * Close the input.
- *
- * This method is invoked when the user removes an input from the switch.
- */
- fun close() {
- _isClosed = true
- cancel()
- }
-
- /**
- * Pull the source if necessary.
- */
- fun pullSync(now: Long) {
- _ctx?.pullSync(now)
- }
-
- /* FlowConsumer */
- override fun startConsumer(source: FlowSource) {
- check(!_isClosed) { "Cannot re-use closed input" }
- check(_ctx == null) { "Consumer is in invalid state" }
-
- val ctx = engine.newContext(source, this)
- _ctx = ctx
-
- ctx.capacity = capacity
- scheduler.registerInput(this)
-
- ctx.start()
- }
-
- override fun pull() {
- _ctx?.pull()
- }
-
- override fun cancel() {
- _ctx?.close()
- }
-
- /* FlowConsumerLogic */
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- rate: Double
- ) {
- doUpdateCounters(now)
-
- val allowed = min(rate, capacity)
- limit = rate
- actualRate = allowed
- allowedRate = allowed
-
- scheduler.trigger(now)
- }
-
- override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
- doUpdateCounters(now)
-
- limit = 0.0
- actualRate = 0.0
- allowedRate = 0.0
-
- scheduler.deregisterInput(this, now)
-
- _ctx = null
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long) {
- scheduler.convergeInput(this, now)
- }
-
- /* Comparable */
- override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
-
- /**
- * The timestamp that the counters where last updated.
- */
- private var _lastUpdate = Long.MIN_VALUE
-
- /**
- * Helper method to update the flow counters of the multiplexer.
- */
- fun doUpdateCounters(now: Long) {
- val lastUpdate = _lastUpdate
- _lastUpdate = now
-
- val delta = (now - lastUpdate).coerceAtLeast(0)
- if (delta <= 0L) {
- return
- }
-
- val actualRate = actualRate
-
- val deltaS = delta * D_MS_TO_S
- val demand = limit * deltaS
- val actual = actualRate * deltaS
- val remaining = (_capacity - actualRate) * deltaS
-
- _counters.increment(demand, actual, remaining)
- scheduler.counters.increment(0.0, 0.0, 0.0)
- }
- }
-
- /**
- * An internal [FlowSource] implementation for multiplexer outputs.
- */
- private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> {
- /**
- * The active [FlowConnection] of this source.
- */
- private var _conn: FlowConnection? = null
-
- /**
- * The capacity of this output.
- */
- @JvmField var capacity: Double = 0.0
-
- /**
- * A flag to indicate that this output is the activation output.
- */
- var isActivationOutput: Boolean
- get() = _isActivationOutput
- set(value) {
- _isActivationOutput = value
- _conn?.shouldSourceConverge = value
- }
- private var _isActivationOutput: Boolean = false
-
- /**
- * A flag to indicate that the output is active.
- */
- @JvmField var isActive = false
-
- /**
- * Push the specified rate to the consumer.
- */
- fun push(rate: Double) {
- _conn?.push(rate)
- }
-
- /**
- * Cancel this output.
- */
- fun cancel() {
- _conn?.close()
- }
-
- /**
- * Pull this output.
- */
- fun pull(now: Long) {
- _conn?.pull(now)
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- assert(_conn == null) { "Source running concurrently" }
- _conn = conn
- capacity = conn.capacity
- isActive = true
-
- scheduler.registerOutput(this)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _conn = null
- capacity = 0.0
- isActive = false
-
- scheduler.deregisterOutput(this, now)
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val capacity = capacity
- if (capacity != conn.capacity) {
- this.capacity = capacity
- scheduler.updateCapacity()
- }
-
- return if (_isActivationOutput) {
- // If this output is the activation output, synchronously run the scheduler and return the new deadline
- val deadline = scheduler.runScheduler(now)
- if (deadline == Long.MAX_VALUE) {
- deadline
- } else {
- deadline - now
- }
- } else {
- // Output is not the activation output, so trigger activation output and do not install timer for this
- // output (by returning `Long.MAX_VALUE`)
- scheduler.trigger(now)
-
- Long.MAX_VALUE
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- if (_isActivationOutput) {
- scheduler.convergeOutput(this, now)
- }
- }
-
- override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
deleted file mode 100644
index 6cfcc82c..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-import kotlin.math.roundToLong
-
-/**
- * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization].
- */
-public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource {
-
- init {
- require(amount >= 0.0) { "Amount must be positive" }
- require(utilization > 0.0) { "Utilization must be positive" }
- }
-
- private var remainingAmount = amount
- private var lastPull: Long = 0L
-
- override fun onStart(conn: FlowConnection, now: Long) {
- lastPull = now
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val lastPull = lastPull
- this.lastPull = now
- val delta = (now - lastPull).coerceAtLeast(0)
-
- val consumed = conn.rate * delta / 1000.0
- val limit = conn.capacity * utilization
-
- remainingAmount -= consumed
-
- val duration = (remainingAmount / limit * 1000).roundToLong()
-
- return if (duration > 0) {
- conn.push(limit)
- duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
deleted file mode 100644
index b3191ad3..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-/**
- * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to
- * finish a pull, before proceeding its operation.
- */
-public class FlowSourceBarrier(public val parties: Int) {
- private var counter = 0
-
- /**
- * Enter the barrier and determine whether the caller is the last to reach the barrier.
- *
- * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
- */
- public fun enter(): Boolean {
- val last = ++counter == parties
- if (last) {
- counter = 0
- return true
- }
- return false
- }
-
- /**
- * Reset the barrier.
- */
- public fun reset() {
- counter = 0
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
deleted file mode 100644
index 80127fb5..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * Helper class to expose an observable [rate] field describing the flow rate of the source.
- */
-public class FlowSourceRateAdapter(
- private val delegate: FlowSource,
- private val callback: (Double) -> Unit = {}
-) : FlowSource by delegate {
- /**
- * The resource processing speed at this instant.
- */
- public var rate: Double = 0.0
- private set(value) {
- if (field != value) {
- callback(value)
- field = value
- }
- }
-
- init {
- callback(0.0)
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.shouldSourceConverge = true
-
- delegate.onStart(conn, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- try {
- delegate.onStop(conn, now)
- } finally {
- rate = 0.0
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return delegate.onPull(conn, now)
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- delegate.onConverge(conn, now)
- } finally {
- rate = conn.rate
- }
- }
-
- override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
deleted file mode 100644
index c9a52128..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time.
- */
-public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource {
- private var _iterator: Iterator<Fragment>? = null
- private var _nextTarget = Long.MIN_VALUE
-
- override fun onStart(conn: FlowConnection, now: Long) {
- check(_iterator == null) { "Source already running" }
- _iterator = trace.iterator()
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _iterator = null
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- // Check whether the trace fragment was fully consumed, otherwise wait until we have done so
- val nextTarget = _nextTarget
- if (nextTarget > now) {
- return now - nextTarget
- }
-
- val iterator = checkNotNull(_iterator)
- return if (iterator.hasNext()) {
- val fragment = iterator.next()
- _nextTarget = now + fragment.duration
- conn.push(fragment.usage)
- fragment.duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
-
- /**
- * A fragment of the trace.
- */
- public data class Fragment(val duration: Long, val usage: Double)
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
deleted file mode 100644
index f89133dd..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowConsumerContextImpl] class.
- */
-class FlowConsumerContextTest {
- @Test
- fun testFlushWithoutCommand() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- engine.scheduleSync(engine.clock.millis(), context)
- }
-
- @Test
- fun testDoubleStart() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(0.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- context.start()
-
- assertThrows<IllegalStateException> {
- context.start()
- }
- }
-
- @Test
- fun testIdempotentCapacityChange() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
- context.capacity = 4200.0
- context.start()
- context.capacity = 4200.0
-
- verify(exactly = 1) { consumer.onPull(any(), any()) }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
deleted file mode 100644
index f75e5037..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Disabled
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowForwarder] class.
- */
-internal class FlowForwarderTest {
- @Test
- fun testCancelImmediately() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testCancel() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 10 * 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testState() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- assertFalse(forwarder.isActive)
-
- forwarder.startConsumer(consumer)
- assertTrue(forwarder.isActive)
-
- assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
-
- forwarder.cancel()
- assertFalse(forwarder.isActive)
-
- forwarder.close()
- assertFalse(forwarder.isActive)
- }
-
- @Test
- fun testCancelPendingDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
-
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.startConsumer(consumer)
- forwarder.cancel()
-
- verify(exactly = 0) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelStartedDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- forwarder.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- source.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testExitPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- source.startConsumer(forwarder)
- forwarder.consume(consumer)
- yield()
-
- assertFalse(forwarder.isActive)
- }
-
- @Test
- @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val sink = FlowSink(engine, 1.0)
-
- val source = spyk(FixedFlowSource(2.0, 1.0))
- sink.startConsumer(forwarder)
-
- coroutineScope {
- launch { forwarder.consume(source) }
- delay(1000)
- sink.capacity = 0.5
- }
-
- assertEquals(3000, clock.millis())
- verify(exactly = 1) { source.onPull(any(), any()) }
- }
-
- @Test
- fun testCounters() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 1.0)
-
- val consumer = FixedFlowSource(2.0, 1.0)
- source.startConsumer(forwarder)
-
- forwarder.consume(consumer)
-
- yield()
-
- assertAll(
- { assertEquals(2.0, source.counters.actual) },
- { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } },
- { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } },
- { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } },
- { assertEquals(2000, clock.millis()) }
- )
- }
-
- @Test
- fun testCoupledExit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(FixedFlowSource(2000.0, 1.0))
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testPullFailureCoupled() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testStartFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-
- @Test
- fun testConvergeFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.shouldSourceConverge = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
deleted file mode 100644
index 746d752d..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowSink] class.
- */
-internal class FlowSinkTest {
- @Test
- fun testSpeed() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(4200.0, 1.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- @Test
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(engine, 1.0)
-
- val consumer = spyk(FixedFlowSource(2.0, 1.0))
-
- coroutineScope {
- launch { provider.consume(consumer) }
- delay(1000)
- provider.capacity = 0.5
- }
- assertEquals(3000, clock.millis())
- verify(exactly = 3) { consumer.onPull(any(), any()) }
- }
-
- @Test
- fun testSpeedLimit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 2.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- /**
- * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
- * [FlowSource.onPull].
- */
- @Test
- fun testIntermediateInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.pull()
- }
- }
-
- provider.consume(consumer)
- }
-
- @Test
- fun testInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
- lateinit var resCtx: FlowConnection
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- resCtx = conn
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 4000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- launch {
- yield()
- resCtx.pull()
- }
- provider.consume(consumer)
-
- assertEquals(0, clock.millis())
- }
-
- @Test
- fun testFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Hi")
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testExceptionPropagationOnNext() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 1000
- } else {
- throw IllegalStateException()
- }
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testConcurrentConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- assertThrows<IllegalStateException> {
- coroutineScope {
- launch { provider.consume(consumer) }
- provider.consume(consumer)
- }
- }
- }
-
- @Test
- fun testCancelDuringConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- launch { provider.consume(consumer) }
- delay(500)
- provider.cancel()
-
- yield()
-
- assertEquals(500, clock.millis())
- }
-
- @Test
- fun testInfiniteSleep() {
- assertThrows<IllegalStateException> {
- runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
- }
-
- provider.consume(consumer)
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
deleted file mode 100644
index 2409e174..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowForwarder
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [ForwardingFlowMultiplexer] class.
- */
-internal class ForwardingFlowMultiplexerTest {
- /**
- * Test a trace workload.
- */
- @Test
- fun testTrace() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val speed = mutableListOf<Double>()
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
- val forwarder = FlowForwarder(engine)
- val adapter = FlowSourceRateAdapter(forwarder, speed::add)
- source.startConsumer(adapter)
- forwarder.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertAll(
- { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
- { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
- )
- }
-
- /**
- * Test runtime workload on hypervisor.
- */
- @Test
- fun testRuntimeWorkload() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = FixedFlowSource(duration * 3.2, 1.0)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertEquals(duration, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test two workloads running sequentially.
- */
- @Test
- fun testTwoWorkloads() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- isFirst = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
- provider.consume(workload)
- assertEquals(duration * 2, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test concurrent workloads on the machine.
- */
- @Test
- fun testConcurrentWorkloadFails() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- switch.newInput()
- assertThrows<IllegalStateException> { switch.newInput() }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
deleted file mode 100644
index a6bf8ad8..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [FlowMultiplexer] implementations
- */
-internal class MaxMinFlowMultiplexerTest {
- @Test
- fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(scheduler)
-
- val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { it.startConsumer(switch.newOutput()) }
-
- val provider = switch.newInput()
- val consumer = FixedFlowSource(2000.0, 1.0)
-
- try {
- provider.consume(consumer)
- yield()
- } finally {
- switch.clear()
- }
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with a single VM.
- */
- @Test
- fun testOvercommittedSingle() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val provider = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
- provider.consume(workload)
- yield()
- } finally {
- switch.clear()
- }
-
- assertAll(
- { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with two VMs.
- */
- @Test
- fun testOvercommittedDual() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workloadA =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
- val workloadB =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3100.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 73.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val providerA = switch.newInput()
- val providerB = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
-
- coroutineScope {
- launch { providerA.consume(workloadA) }
- providerB.consume(workloadB)
- }
-
- yield()
- } finally {
- switch.clear()
- }
- assertAll(
- { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
deleted file mode 100644
index 552579ff..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FixedFlowSource] class.
- */
-internal class FixedFlowSourceTest {
- @Test
- fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
-
- val consumer = FixedFlowSource(1.0, 1.0)
-
- provider.consume(consumer)
- assertEquals(1000, clock.millis())
- }
-
- @Test
- fun testUtilization() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
-
- val consumer = FixedFlowSource(1.0, 0.5)
-
- provider.consume(consumer)
- assertEquals(2000, clock.millis())
- }
-}