summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-21 22:32:05 +0200
committerGitHub <noreply@github.com>2022-10-21 22:32:05 +0200
commitfa7fdbb0126ea465130961dc37c4ef2d6feb36e9 (patch)
tree9cd46dd7970870b78990d6c35e8e2759d7cf5a13 /opendc-simulator/opendc-simulator-flow/src/test
parent29beb50018cf2ad87b252c6c080f8c5de4600349 (diff)
parent290e1fe14460d91e4703e55ac5f05dbe7b4505f7 (diff)
merge: Implement multi-flow stages in simulator (#110)
This pull request introduces the new `flow2` multi-flow simulator into the OpenDC codebase and adjust all existing modules to make use of this new simulator. The new simulator models flow as a network of components, which can each receive flow from (potentially) multiple other components. In the previous simulator, the framework itself supported only single flows between components and required re-implementation of many components to support multiplexing flows. Initial benchmarks show performance improvements in the range 2x–4x for large scale experiments such as the Capelin benchmarks. ## Implementation Notes :hammer_and_pick: * Add support for multi-flow stages * Support flow transformations * Add forwarding flow multiplexer * Expose metrics on FlowMultiplexer * Re-implement network sim using flow2 * Re-implement power sim using flow2 * Re-implement compute sim using flow2 * Optimize workload implementation of SimTrace * Remove old flow simulator * Add log4j-core dependency ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Removal of the `org.opendc.simulator.flow` package. You should now use the new flow simulator located in `org.opendc.simulator.flow2`. * `PowerModel` interface is replaced by the `CpuPowerModel` interface. * `PowerDriver` interface is replaced by the `SimPsu` and `SimPsuFactory` interfaces. * Removal of `SimTraceWorkload`. Instead, create a workload from a `SimTrace` using `createWorkload(offset)`. * `ScalingGovernor` has been split in a `ScalingGovernor` and `ScalingGovernorFactory`. * All modules in `opendc-simulator` are now written in Java. This means that default parameters are not supported anymore for these modules.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/test')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt107
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt331
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt245
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt158
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt150
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt197
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt385
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt71
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt66
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt)39
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt124
11 files changed, 861 insertions, 1012 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
deleted file mode 100644
index f89133dd..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowConsumerContextImpl] class.
- */
-class FlowConsumerContextTest {
- @Test
- fun testFlushWithoutCommand() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- engine.scheduleSync(engine.clock.millis(), context)
- }
-
- @Test
- fun testDoubleStart() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(0.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- context.start()
-
- assertThrows<IllegalStateException> {
- context.start()
- }
- }
-
- @Test
- fun testIdempotentCapacityChange() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
- context.capacity = 4200.0
- context.start()
- context.capacity = 4200.0
-
- verify(exactly = 1) { consumer.onPull(any(), any()) }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
deleted file mode 100644
index f75e5037..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Disabled
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowForwarder] class.
- */
-internal class FlowForwarderTest {
- @Test
- fun testCancelImmediately() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testCancel() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 10 * 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testState() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- assertFalse(forwarder.isActive)
-
- forwarder.startConsumer(consumer)
- assertTrue(forwarder.isActive)
-
- assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
-
- forwarder.cancel()
- assertFalse(forwarder.isActive)
-
- forwarder.close()
- assertFalse(forwarder.isActive)
- }
-
- @Test
- fun testCancelPendingDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
-
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.startConsumer(consumer)
- forwarder.cancel()
-
- verify(exactly = 0) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelStartedDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- forwarder.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- source.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testExitPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- source.startConsumer(forwarder)
- forwarder.consume(consumer)
- yield()
-
- assertFalse(forwarder.isActive)
- }
-
- @Test
- @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val sink = FlowSink(engine, 1.0)
-
- val source = spyk(FixedFlowSource(2.0, 1.0))
- sink.startConsumer(forwarder)
-
- coroutineScope {
- launch { forwarder.consume(source) }
- delay(1000)
- sink.capacity = 0.5
- }
-
- assertEquals(3000, clock.millis())
- verify(exactly = 1) { source.onPull(any(), any()) }
- }
-
- @Test
- fun testCounters() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 1.0)
-
- val consumer = FixedFlowSource(2.0, 1.0)
- source.startConsumer(forwarder)
-
- forwarder.consume(consumer)
-
- yield()
-
- assertAll(
- { assertEquals(2.0, source.counters.actual) },
- { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } },
- { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } },
- { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } },
- { assertEquals(2000, clock.millis()) }
- )
- }
-
- @Test
- fun testCoupledExit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(FixedFlowSource(2000.0, 1.0))
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testPullFailureCoupled() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testStartFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-
- @Test
- fun testConvergeFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.shouldSourceConverge = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
deleted file mode 100644
index 746d752d..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowSink] class.
- */
-internal class FlowSinkTest {
- @Test
- fun testSpeed() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(4200.0, 1.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- @Test
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(engine, 1.0)
-
- val consumer = spyk(FixedFlowSource(2.0, 1.0))
-
- coroutineScope {
- launch { provider.consume(consumer) }
- delay(1000)
- provider.capacity = 0.5
- }
- assertEquals(3000, clock.millis())
- verify(exactly = 3) { consumer.onPull(any(), any()) }
- }
-
- @Test
- fun testSpeedLimit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 2.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- /**
- * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
- * [FlowSource.onPull].
- */
- @Test
- fun testIntermediateInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.pull()
- }
- }
-
- provider.consume(consumer)
- }
-
- @Test
- fun testInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
- lateinit var resCtx: FlowConnection
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- resCtx = conn
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 4000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- launch {
- yield()
- resCtx.pull()
- }
- provider.consume(consumer)
-
- assertEquals(0, clock.millis())
- }
-
- @Test
- fun testFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Hi")
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testExceptionPropagationOnNext() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 1000
- } else {
- throw IllegalStateException()
- }
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testConcurrentConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- assertThrows<IllegalStateException> {
- coroutineScope {
- launch { provider.consume(consumer) }
- provider.consume(consumer)
- }
- }
- }
-
- @Test
- fun testCancelDuringConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- launch { provider.consume(consumer) }
- delay(500)
- provider.cancel()
-
- yield()
-
- assertEquals(500, clock.millis())
- }
-
- @Test
- fun testInfiniteSleep() {
- assertThrows<IllegalStateException> {
- runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
- }
-
- provider.consume(consumer)
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
deleted file mode 100644
index 2409e174..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowForwarder
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [ForwardingFlowMultiplexer] class.
- */
-internal class ForwardingFlowMultiplexerTest {
- /**
- * Test a trace workload.
- */
- @Test
- fun testTrace() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val speed = mutableListOf<Double>()
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
- val forwarder = FlowForwarder(engine)
- val adapter = FlowSourceRateAdapter(forwarder, speed::add)
- source.startConsumer(adapter)
- forwarder.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertAll(
- { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
- { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
- )
- }
-
- /**
- * Test runtime workload on hypervisor.
- */
- @Test
- fun testRuntimeWorkload() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = FixedFlowSource(duration * 3.2, 1.0)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertEquals(duration, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test two workloads running sequentially.
- */
- @Test
- fun testTwoWorkloads() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- isFirst = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
- provider.consume(workload)
- assertEquals(duration * 2, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test concurrent workloads on the machine.
- */
- @Test
- fun testConcurrentWorkloadFails() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- switch.newInput()
- assertThrows<IllegalStateException> { switch.newInput() }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
deleted file mode 100644
index a6bf8ad8..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [FlowMultiplexer] implementations
- */
-internal class MaxMinFlowMultiplexerTest {
- @Test
- fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(scheduler)
-
- val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { it.startConsumer(switch.newOutput()) }
-
- val provider = switch.newInput()
- val consumer = FixedFlowSource(2000.0, 1.0)
-
- try {
- provider.consume(consumer)
- yield()
- } finally {
- switch.clear()
- }
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with a single VM.
- */
- @Test
- fun testOvercommittedSingle() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val provider = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
- provider.consume(workload)
- yield()
- } finally {
- switch.clear()
- }
-
- assertAll(
- { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with two VMs.
- */
- @Test
- fun testOvercommittedDual() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workloadA =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
- val workloadB =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3100.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 73.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val providerA = switch.newInput()
- val providerB = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
-
- coroutineScope {
- launch { providerA.consume(workloadA) }
- providerB.consume(workloadB)
- }
-
- yield()
- } finally {
- switch.clear()
- }
- assertAll(
- { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
new file mode 100644
index 00000000..839835ce
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Smoke tests for the Flow API.
+ */
+class FlowEngineTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+ val sink = SimpleFlowSink(graph, 2.0f)
+
+ graph.connect(multiplexer.newOutput(), sink.input)
+
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(sourceA.output, multiplexer.newInput())
+ graph.connect(sourceB.output, multiplexer.newInput())
+ }
+
+ @Test
+ fun testConnectInvalidInlet() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val inlet = mockk<Inlet>()
+ val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ assertThrows<IllegalArgumentException> { graph.connect(source.output, inlet) }
+ }
+
+ @Test
+ fun testConnectInvalidOutlet() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val outlet = mockk<Outlet>()
+ val sink = SimpleFlowSink(graph, 2.0f)
+ assertThrows<IllegalArgumentException> { graph.connect(outlet, sink.input) }
+ }
+
+ @Test
+ fun testConnectInletBelongsToDifferentGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphB, 2.0f)
+ val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectOutletBelongsToDifferentGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphA, 2.0f)
+ val source = SimpleFlowSource(graphB, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectInletAlreadyConnected() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 2.0f)
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(sourceA.output, sink.input)
+ assertThrows<IllegalStateException> { graph.connect(sourceB.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectOutletAlreadyConnected() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sinkA = SimpleFlowSink(graph, 2.0f)
+ val sinkB = SimpleFlowSink(graph, 2.0f)
+ val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(source.output, sinkA.input)
+ assertThrows<IllegalStateException> { graph.connect(source.output, sinkB.input) }
+ }
+
+ @Test
+ fun testDisconnectInletInvalid() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val inlet = mockk<Inlet>()
+ assertThrows<IllegalArgumentException> { graph.disconnect(inlet) }
+ }
+
+ @Test
+ fun testDisconnectOutletInvalid() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val outlet = mockk<Outlet>()
+ assertThrows<IllegalArgumentException> { graph.disconnect(outlet) }
+ }
+
+ @Test
+ fun testDisconnectInletInvalidGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphA, 2.0f)
+
+ assertThrows<IllegalArgumentException> { graphB.disconnect(sink.input) }
+ }
+
+ @Test
+ fun testDisconnectOutletInvalidGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphB.disconnect(source.output) }
+ }
+
+ @Test
+ fun testInletEquality() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sinkA = SimpleFlowSink(graph, 2.0f)
+ val sinkB = SimpleFlowSink(graph, 2.0f)
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+
+ assertEquals(sinkA.input, sinkA.input)
+ assertNotEquals(sinkA.input, sinkB.input)
+
+ assertNotEquals(multiplexer.newInput(), multiplexer.newInput())
+ }
+
+ @Test
+ fun testOutletEquality() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+
+ assertEquals(sourceA.output, sourceA.output)
+ assertNotEquals(sourceA.output, sourceB.output)
+
+ assertNotEquals(multiplexer.newOutput(), multiplexer.newOutput())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
new file mode 100644
index 00000000..1824959c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
@@ -0,0 +1,385 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertAll
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [FlowTimerQueue] class.
+ */
+class FlowTimerQueueTest {
+ private lateinit var queue: FlowTimerQueue
+
+ @BeforeEach
+ fun setUp() {
+ queue = FlowTimerQueue(3)
+ }
+
+ /**
+ * Test whether a call to [FlowTimerQueue.poll] returns `null` for an empty queue.
+ */
+ @Test
+ fun testPollEmpty() {
+ assertAll(
+ { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test whether a call to [FlowTimerQueue.poll] returns the proper value for a queue with a single entry.
+ */
+ @Test
+ fun testSingleEntry() {
+ val entry = mockk<FlowStage>()
+ entry.deadline = 100
+ entry.timerIndex = -1
+
+ queue.enqueue(entry)
+
+ assertAll(
+ { assertEquals(100, queue.peekDeadline()) },
+ { assertNull(queue.poll(10L)) },
+ { assertEquals(entry, queue.poll(200L)) },
+ { assertNull(queue.poll(200L)) }
+ )
+ }
+
+ /**
+ * Test whether [FlowTimerQueue.poll] returns values in the queue in the proper order.
+ */
+ @Test
+ fun testMultipleEntries() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 10
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that the queue is properly resized when the number of entries exceed the capacity.
+ */
+ @Test
+ fun testResize() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ val entryD = mockk<FlowStage>()
+ entryD.deadline = 31
+ entryD.timerIndex = -1
+
+ queue.enqueue(entryD)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryD, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test to verify that we can change the deadline of the last element in the queue.
+ */
+ @Test
+ fun testChangeDeadlineTail() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryA.deadline = 10
+ queue.enqueue(entryA)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can change the deadline of the head entry in the queue.
+ */
+ @Test
+ fun testChangeDeadlineMiddle() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = 10
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can change the deadline of the head entry in the queue.
+ */
+ @Test
+ fun testChangeDeadlineHead() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryB.deadline = 30
+ queue.enqueue(entryB)
+
+ assertAll(
+ { assertEquals(30, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that an unchanged deadline results in a no-op.
+ */
+ @Test
+ fun testChangeDeadlineNop() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ // Should be a no-op
+ queue.enqueue(entryA)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the end of the queue.
+ */
+ @Test
+ fun testRemoveEntryTail() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = Long.MAX_VALUE
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the head of the queue.
+ */
+ @Test
+ fun testRemoveEntryHead() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryB.deadline = Long.MAX_VALUE
+ queue.enqueue(entryB)
+
+ assertAll(
+ { assertEquals(58, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the middle of a queue.
+ */
+ @Test
+ fun testRemoveEntryMiddle() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = Long.MAX_VALUE
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt
new file mode 100644
index 00000000..2250fe87
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertFalse
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [InvocationStack] class.
+ */
+class InvocationStackTest {
+ private val stack = InvocationStack(2)
+
+ @Test
+ fun testPollEmpty() {
+ assertEquals(Long.MAX_VALUE, stack.poll())
+ }
+
+ @Test
+ fun testAddSingle() {
+ assertTrue(stack.tryAdd(10))
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testAddLater() {
+ assertTrue(stack.tryAdd(10))
+ assertFalse(stack.tryAdd(15))
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testAddEarlier() {
+ assertTrue(stack.tryAdd(10))
+ assertTrue(stack.tryAdd(5))
+ assertEquals(5, stack.poll())
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testCapacityExceeded() {
+ assertTrue(stack.tryAdd(10))
+ assertTrue(stack.tryAdd(5))
+ assertTrue(stack.tryAdd(2))
+ assertEquals(2, stack.poll())
+ assertEquals(5, stack.poll())
+ assertEquals(10, stack.poll())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
new file mode 100644
index 00000000..a2ed2195
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.TraceFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Test suite for the [ForwardingFlowMultiplexer] class.
+ */
+class ForwardingFlowMultiplexerTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val switch = ForwardingFlowMultiplexer(graph)
+ val sink = SimpleFlowSink(graph, 3200.0f)
+ graph.connect(switch.newOutput(), sink.input)
+
+ val workload =
+ TraceFlowSource(
+ graph,
+ TraceFlowSource.Trace(
+ longArrayOf(1000, 2000, 3000, 4000),
+ floatArrayOf(28.0f, 3500.0f, 0.0f, 183.0f),
+ 4
+ )
+ )
+ graph.connect(workload.output, switch.newInput())
+
+ advanceUntilIdle()
+
+ assertAll(
+ { assertEquals(4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
index 552579ff..ba339ee3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,38 +20,35 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow.source
+package org.opendc.simulator.flow2.mux
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.SimpleFlowSource
import org.opendc.simulator.kotlin.runSimulation
/**
- * A test suite for the [FixedFlowSource] class.
+ * Test suite for the [MaxMinFlowMultiplexer] class.
*/
-internal class FixedFlowSourceTest {
+class MaxMinFlowMultiplexerTest {
@Test
fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val switch = MaxMinFlowMultiplexer(graph)
- val consumer = FixedFlowSource(1.0, 1.0)
+ val sinks = List(2) { SimpleFlowSink(graph, 2000.0f) }
+ for (source in sinks) {
+ graph.connect(switch.newOutput(), source.input)
+ }
- provider.consume(consumer)
- assertEquals(1000, clock.millis())
- }
-
- @Test
- fun testUtilization() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
+ val source = SimpleFlowSource(graph, 2000.0f, 1.0f)
+ graph.connect(source.output, switch.newInput())
- val consumer = FixedFlowSource(1.0, 0.5)
+ advanceUntilIdle()
- provider.consume(consumer)
- assertEquals(2000, clock.millis())
+ assertEquals(500, clock.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
new file mode 100644
index 00000000..a75efba3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.sink
+
+import kotlinx.coroutines.delay
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.flow2.source.TraceFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+import java.util.concurrent.ThreadLocalRandom
+
+/**
+ * Test suite for the [SimpleFlowSink] class.
+ */
+class FlowSinkTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 1.0f)
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(2000, clock.millis())
+ }
+
+ @Test
+ fun testAdjustCapacity() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 1.0f)
+
+ graph.connect(source.output, sink.input)
+
+ delay(1000)
+ sink.capacity = 0.5f
+
+ advanceUntilIdle()
+
+ assertEquals(3000, clock.millis())
+ }
+
+ @Test
+ fun testUtilization() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 0.5f)
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(4000, clock.millis())
+ }
+
+ @Test
+ fun testFragments() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val trace = TraceFlowSource.Trace(
+ longArrayOf(1000, 2000, 3000, 4000),
+ floatArrayOf(1.0f, 0.5f, 2.0f, 1.0f),
+ 4
+ )
+ val source = TraceFlowSource(
+ graph,
+ trace
+ )
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(4000, clock.millis())
+ }
+
+ @Test
+ fun benchmarkSink() {
+ val random = ThreadLocalRandom.current()
+ val traceSize = 10000000
+ val trace = TraceFlowSource.Trace(
+ LongArray(traceSize) { it * 1000L },
+ FloatArray(traceSize) { random.nextDouble(0.0, 4500.0).toFloat() },
+ traceSize
+ )
+
+ return runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val sink = SimpleFlowSink(graph, 4200.0f)
+ val source = TraceFlowSource(graph, trace)
+ graph.connect(source.output, sink.input)
+ }
+ }
+}