From b0fc93f818e5e735e972a04f5aa49e0ebe1de181 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 14:35:30 +0200 Subject: refactor(simulator): Remove failure callback from FlowSource This change removes the `onFailure` method from FlowSource. Instead, the FlowConsumer will receive the reason for failure of the source. --- .../org/opendc/simulator/flow/FlowConsumer.kt | 26 ++-- .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 5 +- .../org/opendc/simulator/flow/FlowForwarder.kt | 85 ++++++----- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 2 +- .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 8 -- .../flow/internal/FlowConsumerContextImpl.kt | 32 +++-- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 2 +- .../simulator/flow/source/FlowSourceRateAdapter.kt | 44 +++--- .../simulator/flow/FlowConsumerContextTest.kt | 31 ---- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 96 +++++++++++++ .../flow/mux/ExclusiveFlowMultiplexerTest.kt | 157 +++++++++++++++++++++ .../flow/mux/MaxMinFlowMultiplexerTest.kt | 147 +++++++++++++++++++ .../flow/mux/SimResourceSwitchExclusiveTest.kt | 157 --------------------- .../flow/mux/SimResourceSwitchMaxMinTest.kt | 147 ------------------- 14 files changed, 513 insertions(+), 426 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index 3a6e2e97..df2c4fab 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -82,22 +82,26 @@ public interface FlowConsumer { */ public suspend fun FlowConsumer.consume(source: FlowSource) { return suspendCancellableCoroutine { cont -> - startConsumer(object : FlowSource by source { - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - source.onEvent(conn, now, event) - - if (event == FlowEvent.Exit && !cont.isCompleted) { - cont.resume(Unit) + startConsumer(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return try { + source.onPull(conn, now, delta) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause } } - override fun onFailure(conn: FlowConnection, cause: Throwable) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { try { - source.onFailure(conn, cause) + source.onEvent(conn, now, event) + + if (event == FlowEvent.Exit && !cont.isCompleted) { + cont.resume(Unit) + } + } catch (cause: Throwable) { cont.resumeWithException(cause) - } catch (e: Throwable) { - e.addSuppressed(cause) - cont.resumeWithException(e) + throw cause } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index c69cb17e..ef94ab22 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -46,11 +46,12 @@ public interface FlowConsumerLogic { public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} /** - * This method is invoked when the [FlowSource] is completed. + * This method is invoked when the [FlowSource] completed or failed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + * @param cause The cause of the failure or `null` if the source completed. */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 2074033e..bc01a11b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.flow +import mu.KotlinLogging import org.opendc.simulator.flow.internal.FlowCountersImpl /** @@ -31,6 +32,11 @@ import org.opendc.simulator.flow.internal.FlowCountersImpl * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. */ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { + /** + * The logging instance of this connection. + */ + private val logger = KotlinLogging.logger {} + /** * The delegate [FlowSource]. */ @@ -59,23 +65,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } override fun push(rate: Double) { + if (delegate == null) { + return + } + _innerCtx?.push(rate) _demand = rate } override fun close() { - val delegate = checkNotNull(delegate) { "Delegate not active" } - - if (isCoupled) - _innerCtx?.close() - else - _innerCtx?.push(0.0) + val delegate = delegate ?: return + val hasDelegateStarted = hasDelegateStarted // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards reset() - delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + if (hasDelegateStarted) { + delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + } } } @@ -114,16 +122,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } override fun cancel() { - val delegate = delegate - val ctx = _innerCtx - - if (delegate != null) { - this.delegate = null - - if (ctx != null) { - delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit) - } - } + _ctx.close() } override fun close() { @@ -144,34 +143,42 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled updateCounters(conn, delta) - return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + return try { + delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + + reset() + Long.MAX_VALUE + } } override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - FlowEvent.Start -> { - _innerCtx = conn - } + FlowEvent.Start -> _innerCtx = conn FlowEvent.Exit -> { _innerCtx = null val delegate = delegate if (delegate != null) { reset() - delegate.onEvent(this._ctx, now, FlowEvent.Exit) + + try { + delegate.onEvent(this._ctx, now, FlowEvent.Exit) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + } } } - else -> delegate?.onEvent(this._ctx, now, event) - } - } - - override fun onFailure(conn: FlowConnection, cause: Throwable) { - _innerCtx = null + else -> + try { + delegate?.onEvent(this._ctx, now, event) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } - val delegate = delegate - if (delegate != null) { - reset() - delegate.onFailure(this._ctx, cause) + _innerCtx = null + reset() + } } } @@ -180,15 +187,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled */ private fun start() { val delegate = delegate ?: return - delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start) - hasDelegateStarted = true + try { + delegate.onEvent(_ctx, engine.clock.millis(), FlowEvent.Start) + hasDelegateStarted = true + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + reset() + } } /** * Reset the delegate. */ private fun reset() { + if (isCoupled) + _innerCtx?.close() + else + _innerCtx?.push(0.0) + delegate = null hasDelegateStarted = false } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index fb6ca85d..fc590177 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -46,7 +46,7 @@ public class FlowSink( updateCounters(ctx, delta) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { updateCounters(ctx, delta) cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 077b4d38..70687b4f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -47,12 +47,4 @@ public interface FlowSource { * @param event The event that has occurred. */ public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} - - /** - * This method is invoked when the source throws an exception. - * - * @param conn The connection between the source and consumer. - * @param cause The cause of the failure. - */ - public fun onFailure(conn: FlowConnection, cause: Throwable) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index fc9c8059..a74f89b4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.flow.internal +import mu.KotlinLogging import org.opendc.simulator.flow.* import java.util.ArrayDeque import kotlin.math.max @@ -35,6 +36,11 @@ internal class FlowConsumerContextImpl( private val source: FlowSource, private val logic: FlowConsumerLogic ) : FlowConsumerContext { + /** + * The logging instance of this connection. + */ + private val logger = KotlinLogging.logger {} + /** * The capacity of the connection. */ @@ -131,7 +137,7 @@ internal class FlowConsumerContextImpl( if (!_isUpdateActive) { val now = _clock.millis() val delta = max(0, now - _lastPull) - doStop(now, delta) + doStopSource(now, delta) // FIX: Make sure the context converges pull() @@ -231,7 +237,7 @@ internal class FlowConsumerContextImpl( logic.onPush(this, now, pushDelta, demand) } } - State.Closed -> doStop(now, pushDelta) + State.Closed -> doStopSource(now, pushDelta) State.Pending -> throw IllegalStateException("Illegal transition to pending state") } @@ -246,7 +252,7 @@ internal class FlowConsumerContextImpl( // Schedule an update at the new deadline scheduleDelayed(now, deadline) } catch (cause: Throwable) { - doFail(now, pushDelta, cause) + doFailSource(now, pushDelta, cause) } finally { _isUpdateActive = false } @@ -288,21 +294,21 @@ internal class FlowConsumerContextImpl( logic.onConverge(this, timestamp, delta) } catch (cause: Throwable) { - doFail(timestamp, max(0, timestamp - _lastPull), cause) + doFailSource(timestamp, max(0, timestamp - _lastPull), cause) } } override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" /** - * Stop the resource context. + * Stop the [FlowSource]. */ - private fun doStop(now: Long, delta: Long) { + private fun doStopSource(now: Long, delta: Long) { try { source.onEvent(this, now, FlowEvent.Exit) - logic.onFinish(this, now, delta) + logic.onFinish(this, now, delta, null) } catch (cause: Throwable) { - doFail(now, delta, cause) + doFailSource(now, delta, cause) } finally { _deadline = Long.MAX_VALUE _demand = 0.0 @@ -310,17 +316,15 @@ internal class FlowConsumerContextImpl( } /** - * Fail the resource consumer. + * Fail the [FlowSource]. */ - private fun doFail(now: Long, delta: Long, cause: Throwable) { + private fun doFailSource(now: Long, delta: Long, cause: Throwable) { try { - source.onFailure(this, cause) + logic.onFinish(this, now, delta, cause) } catch (e: Throwable) { e.addSuppressed(cause) - e.printStackTrace() + logger.error(e) { "Uncaught exception" } } - - logic.onFinish(this, now, delta) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 9735f121..a3e108f6 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -292,7 +292,7 @@ public class MaxMinFlowMultiplexer( parent?.onConverge(now) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { doUpdateCounters(delta) limit = 0.0 diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 7fcc0405..fcee3906 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -28,7 +28,7 @@ import org.opendc.simulator.flow.FlowSource import kotlin.math.min /** - * Helper class to expose an observable [speed] field describing the speed of the consumer. + * Helper class to expose an observable [rate] field describing the flow rate of the source. */ public class FlowSourceRateAdapter( private val delegate: FlowSource, @@ -37,7 +37,7 @@ public class FlowSourceRateAdapter( /** * The resource processing speed at this instant. */ - public var speed: Double = 0.0 + public var rate: Double = 0.0 private set(value) { if (field != value) { callback(value) @@ -50,33 +50,37 @@ public class FlowSourceRateAdapter( } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + return try { + delegate.onPull(conn, now, delta) + } catch (cause: Throwable) { + rate = 0.0 + throw cause + } } override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - val oldSpeed = speed + val oldSpeed = rate - delegate.onEvent(conn, now, event) + try { + delegate.onEvent(conn, now, event) - when (event) { - FlowEvent.Converge -> speed = conn.rate - FlowEvent.Capacity -> { - // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might - // need to update the current speed. - if (oldSpeed == speed) { - speed = min(conn.capacity, speed) + when (event) { + FlowEvent.Converge -> rate = conn.rate + FlowEvent.Capacity -> { + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == rate) { + rate = min(conn.capacity, rate) + } } + FlowEvent.Exit -> rate = 0.0 + else -> {} } - FlowEvent.Exit -> speed = 0.0 - else -> {} + } catch (cause: Throwable) { + rate = 0.0 + throw cause } } - override fun onFailure(conn: FlowConnection, cause: Throwable) { - speed = 0.0 - - delegate.onFailure(conn, cause) - } - override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index 380fd38a..f1a5cbe4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow import io.mockk.* -import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.internal.FlowConsumerContextImpl @@ -102,34 +101,4 @@ class FlowConsumerContextTest { verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } } - - @Test - fun testFailureNoInfiniteLoop() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - conn.close() - return Long.MAX_VALUE - } - - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - if (event == FlowEvent.Exit) throw IllegalStateException("onEvent") - } - - override fun onFailure(conn: FlowConnection, cause: Throwable) { - throw IllegalStateException("onFailure") - } - }) - - val logic = object : FlowConsumerLogic {} - - val context = FlowConsumerContextImpl(engine, consumer, logic) - - context.start() - - delay(1) - - verify(exactly = 1) { consumer.onFailure(any(), any()) } - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index cbc48a4e..d125c638 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -219,4 +219,100 @@ internal class FlowForwarderTest { assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } assertEquals(2000, clock.millis()) } + + @Test + fun testCoupledExit() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + forwarder.consume(FixedFlowSource(2000.0, 1.0)) + + yield() + + assertFalse(source.isActive) + } + + @Test + fun testPullFailureCoupled() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + try { + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + throw IllegalStateException("Test") + } + }) + } catch (cause: Throwable) { + // Ignore + } + + yield() + + assertFalse(source.isActive) + } + + @Test + fun testEventFailure() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + try { + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + throw IllegalStateException("Test") + } + }) + } catch (cause: Throwable) { + // Ignore + } + + yield() + + assertTrue(source.isActive) + source.cancel() + } + + @Test + fun testEventConvergeFailure() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + try { + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Converge) { + throw IllegalStateException("Test") + } + } + }) + } catch (cause: Throwable) { + // Ignore + } + + yield() + + assertTrue(source.isActive) + source.cancel() + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt new file mode 100644 index 00000000..c8627446 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [ForwardingFlowMultiplexer] class. + */ +internal class ExclusiveFlowMultiplexerTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val speed = mutableListOf() + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addOutput(forwarder) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = FixedFlowSource(duration * 3.2, 1.0) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertEquals(duration, clock.millis()) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : FlowSource { + var isFirst = true + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> isFirst = true + else -> {} + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + provider.consume(workload) + assertEquals(duration * 2, clock.millis()) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + switch.newInput() + assertThrows { switch.newInput() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt new file mode 100644 index 00000000..9f6b8a2c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [FlowMultiplexer] implementations + */ +internal class MaxMinFlowMultiplexerTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val switch = MaxMinFlowMultiplexer(scheduler) + + val sources = List(2) { FlowSink(scheduler, 2000.0) } + sources.forEach { switch.addOutput(it) } + + val provider = switch.newInput() + val consumer = FixedFlowSource(2000.0, 1.0) + + try { + provider.consume(consumer) + yield() + } finally { + switch.clear() + } + } + + /** + * Test overcommitting of resources via the hypervisor with a single VM. + */ + @Test + fun testOvercommittedSingle() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val provider = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + provider.consume(workload) + yield() + } finally { + switch.clear() + } + + assertAll( + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } + + /** + * Test overcommitting of resources via the hypervisor with two VMs. + */ + @Test + fun testOvercommittedDual() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workloadA = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + val workloadB = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3100.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 73.0) + ) + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val providerA = switch.newInput() + val providerB = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + + coroutineScope { + launch { providerA.consume(workloadA) } + providerB.consume(workloadB) + } + + yield() + } finally { + switch.clear() + } + assertAll( + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt deleted file mode 100644 index b503087e..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.flow.source.TraceFlowSource - -/** - * Test suite for the [ForwardingFlowMultiplexer] class. - */ -internal class SimResourceSwitchExclusiveTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val speed = mutableListOf() - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - val forwarder = FlowForwarder(engine) - val adapter = FlowSourceRateAdapter(forwarder, speed::add) - source.startConsumer(adapter) - switch.addOutput(forwarder) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = FixedFlowSource(duration * 3.2, 1.0) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : FlowSource { - var isFirst = true - - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> isFirst = true - else -> {} - } - } - - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - provider.consume(workload) - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - switch.newInput() - assertThrows { switch.newInput() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt deleted file mode 100644 index 089a8d78..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.TraceFlowSource - -/** - * Test suite for the [FlowMultiplexer] implementations - */ -internal class SimResourceSwitchMaxMinTest { - @Test - fun testSmoke() = runBlockingSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(scheduler) - - val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { switch.addOutput(it) } - - val provider = switch.newInput() - val consumer = FixedFlowSource(2000.0, 1.0) - - try { - provider.consume(consumer) - yield() - } finally { - switch.clear() - } - } - - /** - * Test overcommitting of resources via the hypervisor with a single VM. - */ - @Test - fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val provider = switch.newInput() - - try { - switch.addOutput(FlowSink(scheduler, 3200.0)) - provider.consume(workload) - yield() - } finally { - switch.clear() - } - - assertAll( - { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } - - /** - * Test overcommitting of resources via the hypervisor with two VMs. - */ - @Test - fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workloadA = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - val workloadB = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3100.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 73.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val providerA = switch.newInput() - val providerB = switch.newInput() - - try { - switch.addOutput(FlowSink(scheduler, 3200.0)) - - coroutineScope { - launch { providerA.consume(workloadA) } - providerB.consume(workloadB) - } - - yield() - } finally { - switch.clear() - } - assertAll( - { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } -} -- cgit v1.2.3