summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/test')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt107
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt331
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt245
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt158
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt150
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt57
6 files changed, 0 insertions, 1048 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
deleted file mode 100644
index f89133dd..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowConsumerContextImpl] class.
- */
-class FlowConsumerContextTest {
- @Test
- fun testFlushWithoutCommand() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- engine.scheduleSync(engine.clock.millis(), context)
- }
-
- @Test
- fun testDoubleStart() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(0.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- context.start()
-
- assertThrows<IllegalStateException> {
- context.start()
- }
- }
-
- @Test
- fun testIdempotentCapacityChange() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (now == 0L) {
- conn.push(1.0)
- 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- val logic = object : FlowConsumerLogic {}
- val context = FlowConsumerContextImpl(engine, consumer, logic)
- context.capacity = 4200.0
- context.start()
- context.capacity = 4200.0
-
- verify(exactly = 1) { consumer.onPull(any(), any()) }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
deleted file mode 100644
index f75e5037..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import net.bytebuddy.matcher.ElementMatchers.any
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Disabled
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowForwarder] class.
- */
-internal class FlowForwarderTest {
- @Test
- fun testCancelImmediately() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testCancel() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 10 * 1000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- })
-
- forwarder.close()
- source.cancel()
- }
-
- @Test
- fun testState() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- assertFalse(forwarder.isActive)
-
- forwarder.startConsumer(consumer)
- assertTrue(forwarder.isActive)
-
- assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
-
- forwarder.cancel()
- assertFalse(forwarder.isActive)
-
- forwarder.close()
- assertFalse(forwarder.isActive)
- }
-
- @Test
- fun testCancelPendingDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
-
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- })
-
- forwarder.startConsumer(consumer)
- forwarder.cancel()
-
- verify(exactly = 0) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelStartedDelegate() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- forwarder.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testCancelPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = spyk(FixedFlowSource(2000.0, 1.0))
-
- source.startConsumer(forwarder)
- yield()
- forwarder.startConsumer(consumer)
- yield()
- source.cancel()
-
- verify(exactly = 1) { consumer.onStart(any(), any()) }
- verify(exactly = 1) { consumer.onStop(any(), any()) }
- }
-
- @Test
- fun testExitPropagation() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
- }
-
- source.startConsumer(forwarder)
- forwarder.consume(consumer)
- yield()
-
- assertFalse(forwarder.isActive)
- }
-
- @Test
- @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val sink = FlowSink(engine, 1.0)
-
- val source = spyk(FixedFlowSource(2.0, 1.0))
- sink.startConsumer(forwarder)
-
- coroutineScope {
- launch { forwarder.consume(source) }
- delay(1000)
- sink.capacity = 0.5
- }
-
- assertEquals(3000, clock.millis())
- verify(exactly = 1) { source.onPull(any(), any()) }
- }
-
- @Test
- fun testCounters() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 1.0)
-
- val consumer = FixedFlowSource(2.0, 1.0)
- source.startConsumer(forwarder)
-
- forwarder.consume(consumer)
-
- yield()
-
- assertAll(
- { assertEquals(2.0, source.counters.actual) },
- { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } },
- { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } },
- { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } },
- { assertEquals(2000, clock.millis()) }
- )
- }
-
- @Test
- fun testCoupledExit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- forwarder.consume(FixedFlowSource(2000.0, 1.0))
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testPullFailureCoupled() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine, isCoupled = true)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertFalse(source.isActive)
- }
-
- @Test
- fun testStartFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-
- @Test
- fun testConvergeFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 2000.0)
-
- launch { source.consume(forwarder) }
-
- try {
- forwarder.consume(object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.shouldSourceConverge = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Test")
- }
- })
- } catch (cause: Throwable) {
- // Ignore
- }
-
- yield()
-
- assertTrue(source.isActive)
- source.cancel()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
deleted file mode 100644
index 746d752d..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import io.mockk.spyk
-import io.mockk.verify
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FlowSink] class.
- */
-internal class FlowSinkTest {
- @Test
- fun testSpeed() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(4200.0, 1.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- @Test
- fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(engine, 1.0)
-
- val consumer = spyk(FixedFlowSource(2.0, 1.0))
-
- coroutineScope {
- launch { provider.consume(consumer) }
- delay(1000)
- provider.capacity = 0.5
- }
- assertEquals(3000, clock.millis())
- verify(exactly = 3) { consumer.onPull(any(), any()) }
- }
-
- @Test
- fun testSpeedLimit() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 2.0)
-
- val res = mutableListOf<Double>()
- val adapter = FlowSourceRateAdapter(consumer, res::add)
-
- provider.consume(adapter)
-
- assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
- }
-
- /**
- * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
- * [FlowSource.onPull].
- */
- @Test
- fun testIntermediateInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.pull()
- }
- }
-
- provider.consume(consumer)
- }
-
- @Test
- fun testInterrupt() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
- lateinit var resCtx: FlowConnection
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- resCtx = conn
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 4000
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- launch {
- yield()
- resCtx.pull()
- }
- provider.consume(consumer)
-
- assertEquals(0, clock.millis())
- }
-
- @Test
- fun testFailure() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- throw IllegalStateException("Hi")
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return Long.MAX_VALUE
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testExceptionPropagationOnNext() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- var isFirst = true
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- 1000
- } else {
- throw IllegalStateException()
- }
- }
- }
-
- assertThrows<IllegalStateException> {
- provider.consume(consumer)
- }
- }
-
- @Test
- fun testConcurrentConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- assertThrows<IllegalStateException> {
- coroutineScope {
- launch { provider.consume(consumer) }
- provider.consume(consumer)
- }
- }
- }
-
- @Test
- fun testCancelDuringConsumption() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = FixedFlowSource(capacity, 1.0)
-
- launch { provider.consume(consumer) }
- delay(500)
- provider.cancel()
-
- yield()
-
- assertEquals(500, clock.millis())
- }
-
- @Test
- fun testInfiniteSleep() {
- assertThrows<IllegalStateException> {
- runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
- val capacity = 4200.0
- val provider = FlowSink(engine, capacity)
-
- val consumer = object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE
- }
-
- provider.consume(consumer)
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
deleted file mode 100644
index 2409e174..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowForwarder
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.FlowSourceRateAdapter
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [ForwardingFlowMultiplexer] class.
- */
-internal class ForwardingFlowMultiplexerTest {
- /**
- * Test a trace workload.
- */
- @Test
- fun testTrace() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val speed = mutableListOf<Double>()
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
- val forwarder = FlowForwarder(engine)
- val adapter = FlowSourceRateAdapter(forwarder, speed::add)
- source.startConsumer(adapter)
- forwarder.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertAll(
- { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
- { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
- )
- }
-
- /**
- * Test runtime workload on hypervisor.
- */
- @Test
- fun testRuntimeWorkload() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = FixedFlowSource(duration * 3.2, 1.0)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
-
- assertEquals(duration, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test two workloads running sequentially.
- */
- @Test
- fun testTwoWorkloads() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L * 1000
- val workload = object : FlowSource {
- var isFirst = true
-
- override fun onStart(conn: FlowConnection, now: Long) {
- isFirst = true
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return if (isFirst) {
- isFirst = false
- conn.push(1.0)
- duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
- }
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- val provider = switch.newInput()
- provider.consume(workload)
- yield()
- provider.consume(workload)
- assertEquals(duration * 2, clock.millis()) { "Took enough time" }
- }
-
- /**
- * Test concurrent workloads on the machine.
- */
- @Test
- fun testConcurrentWorkloadFails() = runSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val switch = ForwardingFlowMultiplexer(engine)
- val source = FlowSink(engine, 3200.0)
-
- source.startConsumer(switch.newOutput())
-
- switch.newInput()
- assertThrows<IllegalStateException> { switch.newInput() }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
deleted file mode 100644
index a6bf8ad8..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.flow.source.FixedFlowSource
-import org.opendc.simulator.flow.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [FlowMultiplexer] implementations
- */
-internal class MaxMinFlowMultiplexerTest {
- @Test
- fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val switch = MaxMinFlowMultiplexer(scheduler)
-
- val sources = List(2) { FlowSink(scheduler, 2000.0) }
- sources.forEach { it.startConsumer(switch.newOutput()) }
-
- val provider = switch.newInput()
- val consumer = FixedFlowSource(2000.0, 1.0)
-
- try {
- provider.consume(consumer)
- yield()
- } finally {
- switch.clear()
- }
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with a single VM.
- */
- @Test
- fun testOvercommittedSingle() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workload =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val provider = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
- provider.consume(workload)
- yield()
- } finally {
- switch.clear()
- }
-
- assertAll(
- { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
- { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-
- /**
- * Test overcommitting of resources via the hypervisor with two VMs.
- */
- @Test
- fun testOvercommittedDual() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
-
- val duration = 5 * 60L
- val workloadA =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3500.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 183.0)
- )
- )
- val workloadB =
- TraceFlowSource(
- sequenceOf(
- TraceFlowSource.Fragment(duration * 1000, 28.0),
- TraceFlowSource.Fragment(duration * 1000, 3100.0),
- TraceFlowSource.Fragment(duration * 1000, 0.0),
- TraceFlowSource.Fragment(duration * 1000, 73.0)
- )
- )
-
- val switch = MaxMinFlowMultiplexer(scheduler)
- val sink = FlowSink(scheduler, 3200.0)
- val providerA = switch.newInput()
- val providerB = switch.newInput()
-
- try {
- sink.startConsumer(switch.newOutput())
-
- coroutineScope {
- launch { providerA.consume(workloadA) }
- providerB.consume(workloadB)
- }
-
- yield()
- } finally {
- switch.clear()
- }
- assertAll(
- { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
- { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
- { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
- { assertEquals(1200000, clock.millis()) }
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
deleted file mode 100644
index 552579ff..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow.FlowSink
-import org.opendc.simulator.flow.consume
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * A test suite for the [FixedFlowSource] class.
- */
-internal class FixedFlowSourceTest {
- @Test
- fun testSmoke() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
-
- val consumer = FixedFlowSource(1.0, 1.0)
-
- provider.consume(consumer)
- assertEquals(1000, clock.millis())
- }
-
- @Test
- fun testUtilization() = runSimulation {
- val scheduler = FlowEngineImpl(coroutineContext, clock)
- val provider = FlowSink(scheduler, 1.0)
-
- val consumer = FixedFlowSource(1.0, 0.5)
-
- provider.consume(consumer)
- assertEquals(2000, clock.millis())
- }
-}