summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-08-21 14:27:41 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-21 22:13:03 +0200
commitad88144923d76dfc421f0b22a0b4e670b3f6366e (patch)
treed1721cfc33dd76a0eb13c0c00f8a3320f7652863 /opendc-simulator/opendc-simulator-flow/src/test
parenta832ea376e360f3029036a9570c244fb9080e91f (diff)
perf(sim/flow): Add support for multi-flow stages
This change adds support for creating nodes in a flow graph that support multiple inputs and outputs directly, instead of our current approach where we need to re-implement the `FlowConsumerContext` interface in order to support multiple inputs or outputs.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/test')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt197
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt385
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt71
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt54
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt124
5 files changed, 831 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
new file mode 100644
index 00000000..839835ce
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Smoke tests for the Flow API.
+ */
+class FlowEngineTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+ val sink = SimpleFlowSink(graph, 2.0f)
+
+ graph.connect(multiplexer.newOutput(), sink.input)
+
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(sourceA.output, multiplexer.newInput())
+ graph.connect(sourceB.output, multiplexer.newInput())
+ }
+
+ @Test
+ fun testConnectInvalidInlet() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val inlet = mockk<Inlet>()
+ val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ assertThrows<IllegalArgumentException> { graph.connect(source.output, inlet) }
+ }
+
+ @Test
+ fun testConnectInvalidOutlet() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val outlet = mockk<Outlet>()
+ val sink = SimpleFlowSink(graph, 2.0f)
+ assertThrows<IllegalArgumentException> { graph.connect(outlet, sink.input) }
+ }
+
+ @Test
+ fun testConnectInletBelongsToDifferentGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphB, 2.0f)
+ val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectOutletBelongsToDifferentGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphA, 2.0f)
+ val source = SimpleFlowSource(graphB, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectInletAlreadyConnected() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 2.0f)
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(sourceA.output, sink.input)
+ assertThrows<IllegalStateException> { graph.connect(sourceB.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectOutletAlreadyConnected() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sinkA = SimpleFlowSink(graph, 2.0f)
+ val sinkB = SimpleFlowSink(graph, 2.0f)
+ val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(source.output, sinkA.input)
+ assertThrows<IllegalStateException> { graph.connect(source.output, sinkB.input) }
+ }
+
+ @Test
+ fun testDisconnectInletInvalid() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val inlet = mockk<Inlet>()
+ assertThrows<IllegalArgumentException> { graph.disconnect(inlet) }
+ }
+
+ @Test
+ fun testDisconnectOutletInvalid() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val outlet = mockk<Outlet>()
+ assertThrows<IllegalArgumentException> { graph.disconnect(outlet) }
+ }
+
+ @Test
+ fun testDisconnectInletInvalidGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphA, 2.0f)
+
+ assertThrows<IllegalArgumentException> { graphB.disconnect(sink.input) }
+ }
+
+ @Test
+ fun testDisconnectOutletInvalidGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphB.disconnect(source.output) }
+ }
+
+ @Test
+ fun testInletEquality() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sinkA = SimpleFlowSink(graph, 2.0f)
+ val sinkB = SimpleFlowSink(graph, 2.0f)
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+
+ assertEquals(sinkA.input, sinkA.input)
+ assertNotEquals(sinkA.input, sinkB.input)
+
+ assertNotEquals(multiplexer.newInput(), multiplexer.newInput())
+ }
+
+ @Test
+ fun testOutletEquality() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+
+ assertEquals(sourceA.output, sourceA.output)
+ assertNotEquals(sourceA.output, sourceB.output)
+
+ assertNotEquals(multiplexer.newOutput(), multiplexer.newOutput())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
new file mode 100644
index 00000000..1824959c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
@@ -0,0 +1,385 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertAll
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [FlowTimerQueue] class.
+ */
+class FlowTimerQueueTest {
+ private lateinit var queue: FlowTimerQueue
+
+ @BeforeEach
+ fun setUp() {
+ queue = FlowTimerQueue(3)
+ }
+
+ /**
+ * Test whether a call to [FlowTimerQueue.poll] returns `null` for an empty queue.
+ */
+ @Test
+ fun testPollEmpty() {
+ assertAll(
+ { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test whether a call to [FlowTimerQueue.poll] returns the proper value for a queue with a single entry.
+ */
+ @Test
+ fun testSingleEntry() {
+ val entry = mockk<FlowStage>()
+ entry.deadline = 100
+ entry.timerIndex = -1
+
+ queue.enqueue(entry)
+
+ assertAll(
+ { assertEquals(100, queue.peekDeadline()) },
+ { assertNull(queue.poll(10L)) },
+ { assertEquals(entry, queue.poll(200L)) },
+ { assertNull(queue.poll(200L)) }
+ )
+ }
+
+ /**
+ * Test whether [FlowTimerQueue.poll] returns values in the queue in the proper order.
+ */
+ @Test
+ fun testMultipleEntries() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 10
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that the queue is properly resized when the number of entries exceed the capacity.
+ */
+ @Test
+ fun testResize() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ val entryD = mockk<FlowStage>()
+ entryD.deadline = 31
+ entryD.timerIndex = -1
+
+ queue.enqueue(entryD)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryD, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test to verify that we can change the deadline of the last element in the queue.
+ */
+ @Test
+ fun testChangeDeadlineTail() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryA.deadline = 10
+ queue.enqueue(entryA)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can change the deadline of the head entry in the queue.
+ */
+ @Test
+ fun testChangeDeadlineMiddle() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = 10
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can change the deadline of the head entry in the queue.
+ */
+ @Test
+ fun testChangeDeadlineHead() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryB.deadline = 30
+ queue.enqueue(entryB)
+
+ assertAll(
+ { assertEquals(30, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that an unchanged deadline results in a no-op.
+ */
+ @Test
+ fun testChangeDeadlineNop() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ // Should be a no-op
+ queue.enqueue(entryA)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the end of the queue.
+ */
+ @Test
+ fun testRemoveEntryTail() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = Long.MAX_VALUE
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the head of the queue.
+ */
+ @Test
+ fun testRemoveEntryHead() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryB.deadline = Long.MAX_VALUE
+ queue.enqueue(entryB)
+
+ assertAll(
+ { assertEquals(58, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the middle of a queue.
+ */
+ @Test
+ fun testRemoveEntryMiddle() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = Long.MAX_VALUE
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt
new file mode 100644
index 00000000..2250fe87
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertFalse
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [InvocationStack] class.
+ */
+class InvocationStackTest {
+ private val stack = InvocationStack(2)
+
+ @Test
+ fun testPollEmpty() {
+ assertEquals(Long.MAX_VALUE, stack.poll())
+ }
+
+ @Test
+ fun testAddSingle() {
+ assertTrue(stack.tryAdd(10))
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testAddLater() {
+ assertTrue(stack.tryAdd(10))
+ assertFalse(stack.tryAdd(15))
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testAddEarlier() {
+ assertTrue(stack.tryAdd(10))
+ assertTrue(stack.tryAdd(5))
+ assertEquals(5, stack.poll())
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testCapacityExceeded() {
+ assertTrue(stack.tryAdd(10))
+ assertTrue(stack.tryAdd(5))
+ assertTrue(stack.tryAdd(2))
+ assertEquals(2, stack.poll())
+ assertEquals(5, stack.poll())
+ assertEquals(10, stack.poll())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
new file mode 100644
index 00000000..ba339ee3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Test suite for the [MaxMinFlowMultiplexer] class.
+ */
+class MaxMinFlowMultiplexerTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val switch = MaxMinFlowMultiplexer(graph)
+
+ val sinks = List(2) { SimpleFlowSink(graph, 2000.0f) }
+ for (source in sinks) {
+ graph.connect(switch.newOutput(), source.input)
+ }
+
+ val source = SimpleFlowSource(graph, 2000.0f, 1.0f)
+ graph.connect(source.output, switch.newInput())
+
+ advanceUntilIdle()
+
+ assertEquals(500, clock.millis())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
new file mode 100644
index 00000000..a75efba3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.sink
+
+import kotlinx.coroutines.delay
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.flow2.source.TraceFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+import java.util.concurrent.ThreadLocalRandom
+
+/**
+ * Test suite for the [SimpleFlowSink] class.
+ */
+class FlowSinkTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 1.0f)
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(2000, clock.millis())
+ }
+
+ @Test
+ fun testAdjustCapacity() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 1.0f)
+
+ graph.connect(source.output, sink.input)
+
+ delay(1000)
+ sink.capacity = 0.5f
+
+ advanceUntilIdle()
+
+ assertEquals(3000, clock.millis())
+ }
+
+ @Test
+ fun testUtilization() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 0.5f)
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(4000, clock.millis())
+ }
+
+ @Test
+ fun testFragments() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val trace = TraceFlowSource.Trace(
+ longArrayOf(1000, 2000, 3000, 4000),
+ floatArrayOf(1.0f, 0.5f, 2.0f, 1.0f),
+ 4
+ )
+ val source = TraceFlowSource(
+ graph,
+ trace
+ )
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(4000, clock.millis())
+ }
+
+ @Test
+ fun benchmarkSink() {
+ val random = ThreadLocalRandom.current()
+ val traceSize = 10000000
+ val trace = TraceFlowSource.Trace(
+ LongArray(traceSize) { it * 1000L },
+ FloatArray(traceSize) { random.nextDouble(0.0, 4500.0).toFloat() },
+ traceSize
+ )
+
+ return runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val sink = SimpleFlowSink(graph, 4200.0f)
+ val source = TraceFlowSource(graph, trace)
+ graph.connect(source.output, sink.input)
+ }
+ }
+}