diff options
Diffstat (limited to 'opendc-simulator')
19 files changed, 206 insertions, 231 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt index b05d8ad9..8400c225 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.device import org.opendc.simulator.compute.power.PowerDriver import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource import org.opendc.simulator.power.SimPowerInlet import java.util.* @@ -82,19 +81,22 @@ public class SimPsu( } override fun createConsumer(): FlowSource = object : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + _ctx = conn + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _ctx = null + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0) conn.push(powerDraw) return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> _ctx = conn - FlowEvent.Converge -> _powerDraw = conn.rate - FlowEvent.Exit -> _ctx = null - else -> {} - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + _powerDraw = conn.rate } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index b85be39d..cc4f1f6a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource /** @@ -41,29 +40,14 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { */ public fun waitFor(consumer: FlowSource): FlowSource { waiting.add(consumer) - return object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return try { - consumer.onPull(conn, now, delta) - } catch (cause: Throwable) { - complete(consumer) - throw cause - } - } - - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + return object : FlowSource by consumer { + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { try { - consumer.onEvent(conn, now, event) - - if (event == FlowEvent.Exit) { - complete(consumer) - } - } catch (cause: Throwable) { + consumer.onStop(conn, now, delta) + } finally { complete(consumer) - throw cause } } - override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index df2c4fab..4685a755 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -83,20 +83,20 @@ public interface FlowConsumer { public suspend fun FlowConsumer.consume(source: FlowSource) { return suspendCancellableCoroutine { cont -> startConsumer(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return try { - source.onPull(conn, now, delta) + override fun onStart(conn: FlowConnection, now: Long) { + try { + source.onStart(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { try { - source.onEvent(conn, now, event) + source.onStop(conn, now, delta) - if (event == FlowEvent.Exit && !cont.isCompleted) { + if (!cont.isCompleted) { cont.resume(Unit) } } catch (cause: Throwable) { @@ -105,6 +105,24 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return try { + source.onPull(conn, now, delta) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + try { + source.onConverge(conn, now, delta) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + override fun toString(): String = "SuspendingFlowSource" }) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt deleted file mode 100644 index bb6f25b1..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A flow event that is communicated to a [FlowSource]. - */ -public enum class FlowEvent { - /** - * This event is emitted to the source when it has started. - */ - Start, - - /** - * This event is emitted to the source when it is stopped. - */ - Exit, - - /** - * This event is emitted to the source when the system has converged into a steady state. - */ - Converge, -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index bc01a11b..ab5b31c2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging import org.opendc.simulator.flow.internal.FlowCountersImpl +import kotlin.math.max /** * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. @@ -64,6 +65,8 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled _innerCtx?.pull() } + @JvmField var lastPull = Long.MAX_VALUE + override fun push(rate: Double) { if (delegate == null) { return @@ -82,7 +85,9 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled reset() if (hasDelegateStarted) { - delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + val now = engine.clock.millis() + val delta = max(0, now - lastPull) + delegate.onStop(this, now, delta) } } } @@ -134,6 +139,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } } + override fun onStart(conn: FlowConnection, now: Long) { + _innerCtx = conn + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + + try { + delegate.onStop(this._ctx, now, delta) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + } + } + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val delegate = delegate @@ -141,10 +165,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled start() } + _ctx.lastPull = now updateCounters(conn, delta) return try { - delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -153,32 +178,14 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> _innerCtx = conn - FlowEvent.Exit -> { - _innerCtx = null - - val delegate = delegate - if (delegate != null) { - reset() - - try { - delegate.onEvent(this._ctx, now, FlowEvent.Exit) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - } - } - } - else -> - try { - delegate?.onEvent(this._ctx, now, event) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - - _innerCtx = null - reset() - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate?.onConverge(this._ctx, now, delta) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + + _innerCtx = null + reset() } } @@ -189,7 +196,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val delegate = delegate ?: return try { - delegate.onEvent(_ctx, engine.clock.millis(), FlowEvent.Start) + delegate.onStart(_ctx, engine.clock.millis()) hasDelegateStarted = true } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 70687b4f..a4f624ef 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -30,6 +30,23 @@ package org.opendc.simulator.flow */ public interface FlowSource { /** + * This method is invoked when the source is started. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the provider finished. + */ + public fun onStart(conn: FlowConnection, now: Long) {} + + /** + * This method is invoked when the source is finished. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the source finished. + * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. + */ + public fun onStop(conn: FlowConnection, now: Long, delta: Long) {} + + /** * This method is invoked when the source is pulled. * * @param conn The connection between the source and consumer. @@ -40,11 +57,11 @@ public interface FlowSource { public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long /** - * This method is invoked when an event has occurred. + * This method is invoked when the flow graph has converged into a steady-state system. * * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the event is occurring. - * @param event The event that has occurred. + * @param now The virtual timestamp in milliseconds at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} + public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index a86ed6ea..c235b9ae 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -125,7 +125,7 @@ internal class FlowConsumerContextImpl( override fun start() { check(_state == State.Pending) { "Consumer is already started" } engine.batch { - source.onEvent(this, _clock.millis(), FlowEvent.Start) + source.onStart(this, _clock.millis()) _state = State.Active pull() } @@ -140,8 +140,7 @@ internal class FlowConsumerContextImpl( _state = State.Closed if (!_isUpdateActive) { val now = _clock.millis() - val delta = max(0, now - _lastPull) - doStopSource(now, delta) + doStopSource(now) // FIX: Make sure the context converges pull() @@ -210,19 +209,16 @@ internal class FlowConsumerContextImpl( _isUpdateActive = true _isImmediateUpdateScheduled = false - val lastPush = _lastPush - val pushDelta = max(0, now - lastPush) - try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired val deadline = if (_isPulled || _deadline == now) { val lastPull = _lastPull - val pullDelta = max(0, now - lastPull) + val delta = max(0, now - lastPull) _isPulled = false _lastPull = now - val duration = source.onPull(this, now, pullDelta) + val duration = source.onPull(this, now, delta) if (duration != Long.MAX_VALUE) now + duration else @@ -236,12 +232,15 @@ internal class FlowConsumerContextImpl( State.Active -> { val demand = _demand if (demand != _activeDemand) { + val lastPush = _lastPush + val delta = max(0, now - lastPush) + _lastPush = now - logic.onPush(this, now, pushDelta, demand) + logic.onPush(this, now, delta, demand) } } - State.Closed -> doStopSource(now, pushDelta) + State.Closed -> doStopSource(now) State.Pending -> throw IllegalStateException("Illegal transition to pending state") } @@ -256,7 +255,7 @@ internal class FlowConsumerContextImpl( // Schedule an update at the new deadline scheduleDelayed(now, deadline) } catch (cause: Throwable) { - doFailSource(now, pushDelta, cause) + doFailSource(now, cause) } finally { _isUpdateActive = false } @@ -293,12 +292,12 @@ internal class FlowConsumerContextImpl( try { if (_state == State.Active) { - source.onEvent(this, timestamp, FlowEvent.Converge) + source.onConverge(this, timestamp, delta) } logic.onConverge(this, timestamp, delta) } catch (cause: Throwable) { - doFailSource(timestamp, max(0, timestamp - _lastPull), cause) + doFailSource(timestamp, cause) } } @@ -307,12 +306,13 @@ internal class FlowConsumerContextImpl( /** * Stop the [FlowSource]. */ - private fun doStopSource(now: Long, delta: Long) { + private fun doStopSource(now: Long) { try { - source.onEvent(this, now, FlowEvent.Exit) - logic.onFinish(this, now, delta, null) + source.onStop(this, now, max(0, now - _lastPull)) + doFinishConsumer(now, null) } catch (cause: Throwable) { - doFailSource(now, delta, cause) + doFinishConsumer(now, cause) + return } finally { _deadline = Long.MAX_VALUE _demand = 0.0 @@ -322,9 +322,24 @@ internal class FlowConsumerContextImpl( /** * Fail the [FlowSource]. */ - private fun doFailSource(now: Long, delta: Long, cause: Throwable) { + private fun doFailSource(now: Long, cause: Throwable) { + try { + source.onStop(this, now, max(0, now - _lastPull)) + } catch (e: Throwable) { + e.addSuppressed(cause) + doFinishConsumer(now, e) + } finally { + _deadline = Long.MAX_VALUE + _demand = 0.0 + } + } + + /** + * Finish the consumer. + */ + private fun doFinishConsumer(now: Long, cause: Throwable?) { try { - logic.onFinish(this, now, delta, cause) + logic.onFinish(this, now, max(0, now - _lastPush), cause) } catch (e: Throwable) { e.addSuppressed(cause) logger.error(e) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 811d9460..6dd9dcfb 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -88,13 +88,10 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul _availableOutputs += forwarder output.startConsumer(object : FlowSource by forwarder { - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - if (event == FlowEvent.Exit) { - // De-register the output after it has finished - _outputs -= output - } + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _outputs -= output - forwarder.onEvent(conn, now, event) + forwarder.onStop(conn, now, delta) } }) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index b98cf2f1..c7379fa9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -372,6 +372,19 @@ public class MaxMinFlowMultiplexer( provider.cancel() } + override fun onStart(conn: FlowConnection, now: Long) { + assert(_ctx == null) { "Source running concurrently" } + _ctx = conn + capacity = conn.capacity + updateCapacity() + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _ctx = null + capacity = 0.0 + updateCapacity() + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val capacity = capacity if (capacity != conn.capacity) { @@ -383,23 +396,6 @@ public class MaxMinFlowMultiplexer( return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> { - assert(_ctx == null) { "Source running concurrently" } - _ctx = conn - capacity = conn.capacity - updateCapacity() - } - FlowEvent.Exit -> { - _ctx = null - capacity = 0.0 - updateCapacity() - } - else -> {} - } - } - override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 24ae64cb..0c39523f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow.source import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource /** @@ -48,28 +47,22 @@ public class FlowSourceRateAdapter( callback(0.0) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return try { - delegate.onPull(conn, now, delta) - } catch (cause: Throwable) { + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate.onStop(conn, now, delta) + } finally { rate = 0.0 - throw cause } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - try { - delegate.onEvent(conn, now, event) + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } - when (event) { - FlowEvent.Converge -> rate = conn.rate - FlowEvent.Exit -> rate = 0.0 - else -> {} - } - } catch (cause: Throwable) { - rate = 0.0 - throw cause - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + delegate.onConverge(conn, now, delta) + + rate = conn.rate } override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index 4d3ae61a..ae537845 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow.source import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource /** @@ -33,6 +32,15 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource private var _iterator: Iterator<Fragment>? = null private var _nextTarget = Long.MIN_VALUE + override fun onStart(conn: FlowConnection, now: Long) { + check(_iterator == null) { "Source already running" } + _iterator = trace.iterator() + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _iterator = null + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget @@ -52,21 +60,8 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> { - check(_iterator == null) { "Source already running" } - _iterator = trace.iterator() - } - FlowEvent.Exit -> { - _iterator = null - } - else -> {} - } - } - /** - * A fragment of the tgrace. + * A fragment of the trace. */ public data class Fragment(val duration: Long, val usage: Double) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 7fae918a..3690e681 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -22,10 +22,10 @@ package org.opendc.simulator.flow -import io.mockk.spyk -import io.mockk.verify +import io.mockk.* import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation @@ -122,7 +122,7 @@ internal class FlowForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify(exactly = 0) { consumer.onStop(any(), any(), any()) } } @Test @@ -139,8 +139,8 @@ internal class FlowForwarderTest { yield() forwarder.cancel() - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify(exactly = 1) { consumer.onStart(any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any(), any()) } } @Test @@ -157,8 +157,8 @@ internal class FlowForwarderTest { yield() source.cancel() - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify(exactly = 1) { consumer.onStart(any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any(), any()) } } @Test @@ -182,22 +182,23 @@ internal class FlowForwarderTest { } @Test + @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368 fun testAdjustCapacity() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 1.0) + val sink = FlowSink(engine, 1.0) - val consumer = spyk(FixedFlowSource(2.0, 1.0)) - source.startConsumer(forwarder) + val source = spyk(FixedFlowSource(2.0, 1.0)) + sink.startConsumer(forwarder) coroutineScope { - launch { forwarder.consume(consumer) } + launch { forwarder.consume(source) } delay(1000) - source.capacity = 0.5 + sink.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 1) { source.onPull(any(), any(), any()) } } @Test @@ -259,7 +260,7 @@ internal class FlowForwarderTest { } @Test - fun testEventFailure() = runBlockingSimulation { + fun testStartFailure() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val source = FlowSink(engine, 2000.0) @@ -272,7 +273,7 @@ internal class FlowForwarderTest { return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + override fun onStart(conn: FlowConnection, now: Long) { throw IllegalStateException("Test") } }) @@ -287,7 +288,7 @@ internal class FlowForwarderTest { } @Test - fun testEventConvergeFailure() = runBlockingSimulation { + fun testConvergeFailure() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val source = FlowSink(engine, 2000.0) @@ -300,10 +301,8 @@ internal class FlowForwarderTest { return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - if (event == FlowEvent.Converge) { - throw IllegalStateException("Test") - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + throw IllegalStateException("Test") } }) } catch (cause: Throwable) { diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 5d579e5d..70c75864 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.flow -import io.mockk.every -import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* @@ -67,7 +65,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 3) { consumer.onPull(any(), any(), any()) } } @Test @@ -102,7 +100,7 @@ internal class FlowSinkTest { return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + override fun onStart(conn: FlowConnection, now: Long) { conn.pull() } } @@ -120,11 +118,8 @@ internal class FlowSinkTest { val consumer = object : FlowSource { var isFirst = true - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> resCtx = conn - else -> {} - } + override fun onStart(conn: FlowConnection, now: Long) { + resCtx = conn } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -154,9 +149,15 @@ internal class FlowSinkTest { val capacity = 4200.0 val provider = FlowSink(engine, capacity) - val consumer = mockk<FlowSource>(relaxUnitFun = true) - every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) } - .throws(IllegalStateException()) + val consumer = object : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + throw IllegalStateException("Hi") + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return Long.MAX_VALUE + } + } assertThrows<IllegalStateException> { provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt index c8627446..3475f027 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt @@ -108,11 +108,8 @@ internal class ExclusiveFlowMultiplexerTest { val workload = object : FlowSource { var isFirst = true - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> isFirst = true - else -> {} - } + override fun onStart(conn: FlowConnection, now: Long) { + isFirst = true } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt index 45d0bcf0..14d22162 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt @@ -108,7 +108,7 @@ class SimNetworkSinkTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify { consumer.onStart(any(), any()) } } @Test @@ -124,7 +124,7 @@ class SimNetworkSinkTest { assertFalse(sink.isConnected) assertFalse(source.isConnected) - verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify { consumer.onStop(any(), any(), any()) } } private class Source(engine: FlowEngine) : SimNetworkPort() { diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt index 4aa2fa92..62e54ffb 100644 --- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt +++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt @@ -50,7 +50,7 @@ class SimNetworkSwitchVirtualTest { assertTrue(source.isConnected) verify { source.createConsumer() } - verify { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify { consumer.onStart(any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt index ff447703..85b9ab01 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt @@ -30,7 +30,6 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource import org.opendc.simulator.flow.source.FixedFlowSource @@ -87,7 +86,7 @@ internal class SimPduTest { outlet.connect(inlet) outlet.disconnect() - verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify { consumer.onStop(any(), any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt index b411e292..20677633 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource import org.opendc.simulator.flow.source.FixedFlowSource @@ -86,7 +85,7 @@ internal class SimPowerSourceTest { source.connect(inlet) source.disconnect() - verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify { consumer.onStop(any(), any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt index 31ac0b39..c6e0605a 100644 --- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt +++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource import org.opendc.simulator.flow.source.FixedFlowSource @@ -93,7 +92,7 @@ internal class SimUpsTest { ups.connect(inlet) ups.disconnect() - verify { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify { consumer.onStop(any(), any(), any()) } } class SimpleInlet : SimPowerInlet() { |
