summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/test')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt104
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt321
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt241
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt154
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt149
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt57
6 files changed, 1026 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
new file mode 100644
index 00000000..fe39eb2c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+
+/**
+ * A test suite for the [FlowConsumerContextImpl] class.
+ */
+class FlowConsumerContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ engine.scheduleSync(engine.clock.millis(), context)
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(0.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ assertThrows<IllegalStateException> {
+ context.start()
+ }
+ }
+
+ @Test
+ fun testIdempotentCapacityChange() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 4200.0
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
new file mode 100644
index 00000000..12e72b8f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -0,0 +1,321 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Disabled
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+
+/**
+ * A test suite for the [FlowForwarder] class.
+ */
+internal class FlowForwarderTest {
+ @Test
+ fun testCancelImmediately() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ })
+
+ forwarder.close()
+ source.cancel()
+ }
+
+ @Test
+ fun testCancel() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(object : FlowSource {
+ var isFirst = true
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 10 * 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ forwarder.close()
+ source.cancel()
+ }
+
+ @Test
+ fun testState() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ }
+
+ assertFalse(forwarder.isActive)
+
+ forwarder.startConsumer(consumer)
+ assertTrue(forwarder.isActive)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertFalse(forwarder.isActive)
+
+ forwarder.close()
+ assertFalse(forwarder.isActive)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ })
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onStop(any(), any(), any()) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ }
+
+ @Test
+ fun testExitPropagation() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ }
+
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+ yield()
+
+ assertFalse(forwarder.isActive)
+ }
+
+ @Test
+ @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val sink = FlowSink(engine, 1.0)
+
+ val source = spyk(FixedFlowSource(2.0, 1.0))
+ sink.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(source) }
+ delay(1000)
+ sink.capacity = 0.5
+ }
+
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { source.onPull(any(), any(), any()) }
+ }
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
+
+ val consumer = FixedFlowSource(2.0, 1.0)
+ source.startConsumer(forwarder)
+
+ forwarder.consume(consumer)
+
+ yield()
+
+ assertEquals(2.0, source.counters.actual)
+ assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
+ assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
+ assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
+ assertEquals(2000, clock.millis())
+ }
+
+ @Test
+ fun testCoupledExit() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(FixedFlowSource(2000.0, 1.0))
+
+ yield()
+
+ assertFalse(source.isActive)
+ }
+
+ @Test
+ fun testPullFailureCoupled() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertFalse(source.isActive)
+ }
+
+ @Test
+ fun testStartFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertTrue(source.isActive)
+ source.cancel()
+ }
+
+ @Test
+ fun testConvergeFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ conn.shouldSourceConverge = true
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertTrue(source.isActive)
+ source.cancel()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
new file mode 100644
index 00000000..70c75864
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -0,0 +1,241 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+
+/**
+ * A test suite for the [FlowSink] class.
+ */
+internal class FlowSinkTest {
+ @Test
+ fun testSpeed() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(4200.0, 1.0)
+
+ val res = mutableListOf<Double>()
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(engine, 1.0)
+
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
+
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
+ }
+ assertEquals(3000, clock.millis())
+ verify(exactly = 3) { consumer.onPull(any(), any(), any()) }
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 2.0)
+
+ val res = mutableListOf<Double>()
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ }
+
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
+ * [FlowSource.onPull].
+ */
+ @Test
+ fun testIntermediateInterrupt() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ conn.pull()
+ }
+ }
+
+ provider.consume(consumer)
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+ lateinit var resCtx: FlowConnection
+
+ val consumer = object : FlowSource {
+ var isFirst = true
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ resCtx = conn
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 4000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ launch {
+ yield()
+ resCtx.pull()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ }
+
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ throw IllegalStateException("Hi")
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ var isFirst = true
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 1000
+ } else {
+ throw IllegalStateException()
+ }
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 1.0)
+
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testCancelDuringConsumption() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 1.0)
+
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.cancel()
+
+ yield()
+
+ assertEquals(500, clock.millis())
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ assertThrows<IllegalStateException> {
+ runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ }
+
+ provider.consume(consumer)
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
new file mode 100644
index 00000000..187dacd9
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+import org.opendc.simulator.flow.source.TraceFlowSource
+
+/**
+ * Test suite for the [ForwardingFlowMultiplexer] class.
+ */
+internal class ForwardingFlowMultiplexerTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+ val forwarder = FlowForwarder(engine)
+ val adapter = FlowSourceRateAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ forwarder.startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = FixedFlowSource(duration * 3.2, 1.0)
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ source.startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+
+ assertEquals(duration, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : FlowSource {
+ var isFirst = true
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ isFirst = true
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ duration
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ source.startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ source.startConsumer(switch.newOutput())
+
+ switch.newInput()
+ assertThrows<IllegalStateException> { switch.newInput() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
new file mode 100644
index 00000000..6e2cdb98
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.TraceFlowSource
+
+/**
+ * Test suite for the [FlowMultiplexer] implementations
+ */
+internal class MaxMinFlowMultiplexerTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val switch = MaxMinFlowMultiplexer(scheduler)
+
+ val sources = List(2) { FlowSink(scheduler, 2000.0) }
+ sources.forEach { it.startConsumer(switch.newOutput()) }
+
+ val provider = switch.newInput()
+ val consumer = FixedFlowSource(2000.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.clear()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L
+ val workload =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
+ val provider = switch.newInput()
+
+ try {
+ sink.startConsumer(switch.newOutput())
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.clear()
+ }
+
+ assertAll(
+ { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
+ { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L
+ val workloadA =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3100.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
+ val providerA = switch.newInput()
+ val providerB = switch.newInput()
+
+ try {
+ sink.startConsumer(switch.newOutput())
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.clear()
+ }
+ assertAll(
+ { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
+ { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
new file mode 100644
index 00000000..8396d346
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+
+/**
+ * A test suite for the [FixedFlowSource] class.
+ */
+internal class FixedFlowSourceTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
+
+ val consumer = FixedFlowSource(1.0, 1.0)
+
+ provider.consume(consumer)
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testUtilization() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
+
+ val consumer = FixedFlowSource(1.0, 0.5)
+
+ provider.consume(consumer)
+ assertEquals(2000, clock.millis())
+ }
+}