diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/test')
6 files changed, 0 insertions, 1048 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt deleted file mode 100644 index f89133dd..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import net.bytebuddy.matcher.ElementMatchers.any -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowConsumerContextImpl -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowConsumerContextImpl] class. - */ -class FlowConsumerContextTest { - @Test - fun testFlushWithoutCommand() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(1.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - - engine.scheduleSync(engine.clock.millis(), context) - } - - @Test - fun testDoubleStart() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(0.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - - context.start() - - assertThrows<IllegalStateException> { - context.start() - } - } - - @Test - fun testIdempotentCapacityChange() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(1.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - }) - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - context.capacity = 4200.0 - context.start() - context.capacity = 4200.0 - - verify(exactly = 1) { consumer.onPull(any(), any()) } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt deleted file mode 100644 index f75e5037..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import net.bytebuddy.matcher.ElementMatchers.any -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Disabled -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowForwarder] class. - */ -internal class FlowForwarderTest { - @Test - fun testCancelImmediately() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testCancel() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(object : FlowSource { - var isFirst = true - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 10 * 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testState() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - } - - assertFalse(forwarder.isActive) - - forwarder.startConsumer(consumer) - assertTrue(forwarder.isActive) - - assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } - - forwarder.cancel() - assertFalse(forwarder.isActive) - - forwarder.close() - assertFalse(forwarder.isActive) - } - - @Test - fun testCancelPendingDelegate() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - }) - - forwarder.startConsumer(consumer) - forwarder.cancel() - - verify(exactly = 0) { consumer.onStop(any(), any()) } - } - - @Test - fun testCancelStartedDelegate() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - val consumer = spyk(FixedFlowSource(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - forwarder.cancel() - - verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any()) } - } - - @Test - fun testCancelPropagation() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - val consumer = spyk(FixedFlowSource(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - source.cancel() - - verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any()) } - } - - @Test - fun testExitPropagation() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - } - - source.startConsumer(forwarder) - forwarder.consume(consumer) - yield() - - assertFalse(forwarder.isActive) - } - - @Test - @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368 - fun testAdjustCapacity() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val sink = FlowSink(engine, 1.0) - - val source = spyk(FixedFlowSource(2.0, 1.0)) - sink.startConsumer(forwarder) - - coroutineScope { - launch { forwarder.consume(source) } - delay(1000) - sink.capacity = 0.5 - } - - assertEquals(3000, clock.millis()) - verify(exactly = 1) { source.onPull(any(), any()) } - } - - @Test - fun testCounters() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 1.0) - - val consumer = FixedFlowSource(2.0, 1.0) - source.startConsumer(forwarder) - - forwarder.consume(consumer) - - yield() - - assertAll( - { assertEquals(2.0, source.counters.actual) }, - { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } }, - { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } }, - { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } }, - { assertEquals(2000, clock.millis()) } - ) - } - - @Test - fun testCoupledExit() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(FixedFlowSource(2000.0, 1.0)) - - yield() - - assertFalse(source.isActive) - } - - @Test - fun testPullFailureCoupled() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertFalse(source.isActive) - } - - @Test - fun testStartFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - - override fun onStart(conn: FlowConnection, now: Long) { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertTrue(source.isActive) - source.cancel() - } - - @Test - fun testConvergeFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - conn.shouldSourceConverge = true - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - - override fun onConverge(conn: FlowConnection, now: Long) { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertTrue(source.isActive) - source.cancel() - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt deleted file mode 100644 index 746d752d..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowSink] class. - */ -internal class FlowSinkTest { - @Test - fun testSpeed() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(4200.0, 1.0) - - val res = mutableListOf<Double>() - val adapter = FlowSourceRateAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } - - @Test - fun testAdjustCapacity() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(engine, 1.0) - - val consumer = spyk(FixedFlowSource(2.0, 1.0)) - - coroutineScope { - launch { provider.consume(consumer) } - delay(1000) - provider.capacity = 0.5 - } - assertEquals(3000, clock.millis()) - verify(exactly = 3) { consumer.onPull(any(), any()) } - } - - @Test - fun testSpeedLimit() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 2.0) - - val res = mutableListOf<Double>() - val adapter = FlowSourceRateAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } - - /** - * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or - * [FlowSource.onPull]. - */ - @Test - fun testIntermediateInterrupt() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - - override fun onStart(conn: FlowConnection, now: Long) { - conn.pull() - } - } - - provider.consume(consumer) - } - - @Test - fun testInterrupt() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - lateinit var resCtx: FlowConnection - - val consumer = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - resCtx = conn - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 4000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - launch { - yield() - resCtx.pull() - } - provider.consume(consumer) - - assertEquals(0, clock.millis()) - } - - @Test - fun testFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - throw IllegalStateException("Hi") - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - } - - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } - - @Test - fun testExceptionPropagationOnNext() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - var isFirst = true - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 1000 - } else { - throw IllegalStateException() - } - } - } - - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } - - @Test - fun testConcurrentConsumption() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 1.0) - - assertThrows<IllegalStateException> { - coroutineScope { - launch { provider.consume(consumer) } - provider.consume(consumer) - } - } - } - - @Test - fun testCancelDuringConsumption() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 1.0) - - launch { provider.consume(consumer) } - delay(500) - provider.cancel() - - yield() - - assertEquals(500, clock.millis()) - } - - @Test - fun testInfiniteSleep() { - assertThrows<IllegalStateException> { - runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE - } - - provider.consume(consumer) - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt deleted file mode 100644 index 2409e174..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowForwarder -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * Test suite for the [ForwardingFlowMultiplexer] class. - */ -internal class ForwardingFlowMultiplexerTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val speed = mutableListOf<Double>() - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - val forwarder = FlowForwarder(engine) - val adapter = FlowSourceRateAdapter(forwarder, speed::add) - source.startConsumer(adapter) - forwarder.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = FixedFlowSource(duration * 3.2, 1.0) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - isFirst = true - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - provider.consume(workload) - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - switch.newInput() - assertThrows<IllegalStateException> { switch.newInput() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt deleted file mode 100644 index a6bf8ad8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * Test suite for the [FlowMultiplexer] implementations - */ -internal class MaxMinFlowMultiplexerTest { - @Test - fun testSmoke() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(scheduler) - - val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { it.startConsumer(switch.newOutput()) } - - val provider = switch.newInput() - val consumer = FixedFlowSource(2000.0, 1.0) - - try { - provider.consume(consumer) - yield() - } finally { - switch.clear() - } - } - - /** - * Test overcommitting of resources via the hypervisor with a single VM. - */ - @Test - fun testOvercommittedSingle() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val sink = FlowSink(scheduler, 3200.0) - val provider = switch.newInput() - - try { - sink.startConsumer(switch.newOutput()) - provider.consume(workload) - yield() - } finally { - switch.clear() - } - - assertAll( - { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } - - /** - * Test overcommitting of resources via the hypervisor with two VMs. - */ - @Test - fun testOvercommittedDual() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workloadA = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - val workloadB = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3100.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 73.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val sink = FlowSink(scheduler, 3200.0) - val providerA = switch.newInput() - val providerB = switch.newInput() - - try { - sink.startConsumer(switch.newOutput()) - - coroutineScope { - launch { providerA.consume(workloadA) } - providerB.consume(workloadB) - } - - yield() - } finally { - switch.clear() - } - assertAll( - { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt deleted file mode 100644 index 552579ff..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FixedFlowSource] class. - */ -internal class FixedFlowSourceTest { - @Test - fun testSmoke() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(scheduler, 1.0) - - val consumer = FixedFlowSource(1.0, 1.0) - - provider.consume(consumer) - assertEquals(1000, clock.millis()) - } - - @Test - fun testUtilization() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(scheduler, 1.0) - - val consumer = FixedFlowSource(1.0, 0.5) - - provider.consume(consumer) - assertEquals(2000, clock.millis()) - } -} |
