diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-08-21 14:27:41 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-21 22:13:03 +0200 |
| commit | ad88144923d76dfc421f0b22a0b4e670b3f6366e (patch) | |
| tree | d1721cfc33dd76a0eb13c0c00f8a3320f7652863 /opendc-simulator/opendc-simulator-flow/src/test | |
| parent | a832ea376e360f3029036a9570c244fb9080e91f (diff) | |
perf(sim/flow): Add support for multi-flow stages
This change adds support for creating nodes in a flow graph that support
multiple inputs and outputs directly, instead of our current approach
where we need to re-implement the `FlowConsumerContext` interface in
order to support multiple inputs or outputs.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/test')
5 files changed, 831 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt new file mode 100644 index 00000000..839835ce --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow2.sink.SimpleFlowSink +import org.opendc.simulator.flow2.source.SimpleFlowSource +import org.opendc.simulator.kotlin.runSimulation + +/** + * Smoke tests for the Flow API. + */ +class FlowEngineTest { + @Test + fun testSmoke() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val multiplexer = MaxMinFlowMultiplexer(graph) + val sink = SimpleFlowSink(graph, 2.0f) + + graph.connect(multiplexer.newOutput(), sink.input) + + val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) + val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f) + + graph.connect(sourceA.output, multiplexer.newInput()) + graph.connect(sourceB.output, multiplexer.newInput()) + } + + @Test + fun testConnectInvalidInlet() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val inlet = mockk<Inlet>() + val source = SimpleFlowSource(graph, 2000.0f, 0.8f) + assertThrows<IllegalArgumentException> { graph.connect(source.output, inlet) } + } + + @Test + fun testConnectInvalidOutlet() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val outlet = mockk<Outlet>() + val sink = SimpleFlowSink(graph, 2.0f) + assertThrows<IllegalArgumentException> { graph.connect(outlet, sink.input) } + } + + @Test + fun testConnectInletBelongsToDifferentGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val sink = SimpleFlowSink(graphB, 2.0f) + val source = SimpleFlowSource(graphA, 2000.0f, 0.8f) + + assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) } + } + + @Test + fun testConnectOutletBelongsToDifferentGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val sink = SimpleFlowSink(graphA, 2.0f) + val source = SimpleFlowSource(graphB, 2000.0f, 0.8f) + + assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) } + } + + @Test + fun testConnectInletAlreadyConnected() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 2.0f) + val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) + val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f) + + graph.connect(sourceA.output, sink.input) + assertThrows<IllegalStateException> { graph.connect(sourceB.output, sink.input) } + } + + @Test + fun testConnectOutletAlreadyConnected() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sinkA = SimpleFlowSink(graph, 2.0f) + val sinkB = SimpleFlowSink(graph, 2.0f) + val source = SimpleFlowSource(graph, 2000.0f, 0.8f) + + graph.connect(source.output, sinkA.input) + assertThrows<IllegalStateException> { graph.connect(source.output, sinkB.input) } + } + + @Test + fun testDisconnectInletInvalid() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val inlet = mockk<Inlet>() + assertThrows<IllegalArgumentException> { graph.disconnect(inlet) } + } + + @Test + fun testDisconnectOutletInvalid() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val outlet = mockk<Outlet>() + assertThrows<IllegalArgumentException> { graph.disconnect(outlet) } + } + + @Test + fun testDisconnectInletInvalidGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val sink = SimpleFlowSink(graphA, 2.0f) + + assertThrows<IllegalArgumentException> { graphB.disconnect(sink.input) } + } + + @Test + fun testDisconnectOutletInvalidGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val source = SimpleFlowSource(graphA, 2000.0f, 0.8f) + + assertThrows<IllegalArgumentException> { graphB.disconnect(source.output) } + } + + @Test + fun testInletEquality() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sinkA = SimpleFlowSink(graph, 2.0f) + val sinkB = SimpleFlowSink(graph, 2.0f) + + val multiplexer = MaxMinFlowMultiplexer(graph) + + assertEquals(sinkA.input, sinkA.input) + assertNotEquals(sinkA.input, sinkB.input) + + assertNotEquals(multiplexer.newInput(), multiplexer.newInput()) + } + + @Test + fun testOutletEquality() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) + val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f) + + val multiplexer = MaxMinFlowMultiplexer(graph) + + assertEquals(sourceA.output, sourceA.output) + assertNotEquals(sourceA.output, sourceB.output) + + assertNotEquals(multiplexer.newOutput(), multiplexer.newOutput()) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt new file mode 100644 index 00000000..1824959c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt @@ -0,0 +1,385 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertAll +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * Test suite for the [FlowTimerQueue] class. + */ +class FlowTimerQueueTest { + private lateinit var queue: FlowTimerQueue + + @BeforeEach + fun setUp() { + queue = FlowTimerQueue(3) + } + + /** + * Test whether a call to [FlowTimerQueue.poll] returns `null` for an empty queue. + */ + @Test + fun testPollEmpty() { + assertAll( + { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test whether a call to [FlowTimerQueue.poll] returns the proper value for a queue with a single entry. + */ + @Test + fun testSingleEntry() { + val entry = mockk<FlowStage>() + entry.deadline = 100 + entry.timerIndex = -1 + + queue.enqueue(entry) + + assertAll( + { assertEquals(100, queue.peekDeadline()) }, + { assertNull(queue.poll(10L)) }, + { assertEquals(entry, queue.poll(200L)) }, + { assertNull(queue.poll(200L)) } + ) + } + + /** + * Test whether [FlowTimerQueue.poll] returns values in the queue in the proper order. + */ + @Test + fun testMultipleEntries() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 10 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + assertAll( + { assertEquals(10, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that the queue is properly resized when the number of entries exceed the capacity. + */ + @Test + fun testResize() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + val entryD = mockk<FlowStage>() + entryD.deadline = 31 + entryD.timerIndex = -1 + + queue.enqueue(entryD) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryD, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test to verify that we can change the deadline of the last element in the queue. + */ + @Test + fun testChangeDeadlineTail() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryA.deadline = 10 + queue.enqueue(entryA) + + assertAll( + { assertEquals(10, queue.peekDeadline()) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can change the deadline of the head entry in the queue. + */ + @Test + fun testChangeDeadlineMiddle() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryC.deadline = 10 + queue.enqueue(entryC) + + assertAll( + { assertEquals(10, queue.peekDeadline()) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can change the deadline of the head entry in the queue. + */ + @Test + fun testChangeDeadlineHead() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryB.deadline = 30 + queue.enqueue(entryB) + + assertAll( + { assertEquals(30, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that an unchanged deadline results in a no-op. + */ + @Test + fun testChangeDeadlineNop() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + // Should be a no-op + queue.enqueue(entryA) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can remove an entry from the end of the queue. + */ + @Test + fun testRemoveEntryTail() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryC.deadline = Long.MAX_VALUE + queue.enqueue(entryC) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can remove an entry from the head of the queue. + */ + @Test + fun testRemoveEntryHead() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryB.deadline = Long.MAX_VALUE + queue.enqueue(entryB) + + assertAll( + { assertEquals(58, queue.peekDeadline()) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can remove an entry from the middle of a queue. + */ + @Test + fun testRemoveEntryMiddle() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryC.deadline = Long.MAX_VALUE + queue.enqueue(entryC) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt new file mode 100644 index 00000000..2250fe87 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +/** + * Test suite for the [InvocationStack] class. + */ +class InvocationStackTest { + private val stack = InvocationStack(2) + + @Test + fun testPollEmpty() { + assertEquals(Long.MAX_VALUE, stack.poll()) + } + + @Test + fun testAddSingle() { + assertTrue(stack.tryAdd(10)) + assertEquals(10, stack.poll()) + } + + @Test + fun testAddLater() { + assertTrue(stack.tryAdd(10)) + assertFalse(stack.tryAdd(15)) + assertEquals(10, stack.poll()) + } + + @Test + fun testAddEarlier() { + assertTrue(stack.tryAdd(10)) + assertTrue(stack.tryAdd(5)) + assertEquals(5, stack.poll()) + assertEquals(10, stack.poll()) + } + + @Test + fun testCapacityExceeded() { + assertTrue(stack.tryAdd(10)) + assertTrue(stack.tryAdd(5)) + assertTrue(stack.tryAdd(2)) + assertEquals(2, stack.poll()) + assertEquals(5, stack.poll()) + assertEquals(10, stack.poll()) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt new file mode 100644 index 00000000..ba339ee3 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.flow2.sink.SimpleFlowSink +import org.opendc.simulator.flow2.source.SimpleFlowSource +import org.opendc.simulator.kotlin.runSimulation + +/** + * Test suite for the [MaxMinFlowMultiplexer] class. + */ +class MaxMinFlowMultiplexerTest { + @Test + fun testSmoke() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val switch = MaxMinFlowMultiplexer(graph) + + val sinks = List(2) { SimpleFlowSink(graph, 2000.0f) } + for (source in sinks) { + graph.connect(switch.newOutput(), source.input) + } + + val source = SimpleFlowSource(graph, 2000.0f, 1.0f) + graph.connect(source.output, switch.newInput()) + + advanceUntilIdle() + + assertEquals(500, clock.millis()) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt new file mode 100644 index 00000000..a75efba3 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.sink + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.flow2.source.SimpleFlowSource +import org.opendc.simulator.flow2.source.TraceFlowSource +import org.opendc.simulator.kotlin.runSimulation +import java.util.concurrent.ThreadLocalRandom + +/** + * Test suite for the [SimpleFlowSink] class. + */ +class FlowSinkTest { + @Test + fun testSmoke() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val source = SimpleFlowSource(graph, 2.0f, 1.0f) + + graph.connect(source.output, sink.input) + advanceUntilIdle() + + assertEquals(2000, clock.millis()) + } + + @Test + fun testAdjustCapacity() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val source = SimpleFlowSource(graph, 2.0f, 1.0f) + + graph.connect(source.output, sink.input) + + delay(1000) + sink.capacity = 0.5f + + advanceUntilIdle() + + assertEquals(3000, clock.millis()) + } + + @Test + fun testUtilization() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val source = SimpleFlowSource(graph, 2.0f, 0.5f) + + graph.connect(source.output, sink.input) + advanceUntilIdle() + + assertEquals(4000, clock.millis()) + } + + @Test + fun testFragments() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val trace = TraceFlowSource.Trace( + longArrayOf(1000, 2000, 3000, 4000), + floatArrayOf(1.0f, 0.5f, 2.0f, 1.0f), + 4 + ) + val source = TraceFlowSource( + graph, + trace + ) + + graph.connect(source.output, sink.input) + advanceUntilIdle() + + assertEquals(4000, clock.millis()) + } + + @Test + fun benchmarkSink() { + val random = ThreadLocalRandom.current() + val traceSize = 10000000 + val trace = TraceFlowSource.Trace( + LongArray(traceSize) { it * 1000L }, + FloatArray(traceSize) { random.nextDouble(0.0, 4500.0).toFloat() }, + traceSize + ) + + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val sink = SimpleFlowSink(graph, 4200.0f) + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, sink.input) + } + } +} |
