summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-30 15:37:35 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:40 +0200
commita2ce07026bf3ef17326e72f395dfa2dd9d9b17be (patch)
tree347cd882148b43808bdd146fb7870b00103e5e6b
parent4f5a1f88d0c6aa19ce4cab0ec7b9b13a24c92fbe (diff)
refactor(simulator): Create separate callbacks for remaining events
This change creates separate callbacks for the remaining events: onStart, onStop and onConverge.
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt20
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt18
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt24
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt30
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt43
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt65
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt25
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt30
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt29
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt25
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt39
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt25
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt7
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt2
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt3
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt3
-rw-r--r--opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt3
20 files changed, 214 insertions, 243 deletions
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 6f460ef7..017bca59 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -128,6 +128,11 @@ public class SimTFDevice(
}
}
+ override fun onStart(conn: FlowConnection, now: Long) {
+ ctx = conn
+ capacity = conn.capacity
+ }
+
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val consumedWork = conn.rate * delta / 1000.0
@@ -157,18 +162,9 @@ public class SimTFDevice(
}
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- when (event) {
- FlowEvent.Start -> {
- ctx = conn
- capacity = conn.capacity
- }
- FlowEvent.Converge -> {
- _usage.record(conn.rate)
- _power.record(machine.psu.powerDraw)
- }
- else -> {}
- }
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ _usage.record(conn.rate)
+ _power.record(machine.psu.powerDraw)
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
index b05d8ad9..8400c225 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/device/SimPsu.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.compute.device
import org.opendc.simulator.compute.power.PowerDriver
import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
import org.opendc.simulator.power.SimPowerInlet
import java.util.*
@@ -82,19 +81,22 @@ public class SimPsu(
}
override fun createConsumer(): FlowSource = object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _ctx = conn
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _ctx = null
+ }
+
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val powerDraw = computePowerDraw(_driver?.computePower() ?: 0.0)
conn.push(powerDraw)
return Long.MAX_VALUE
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- when (event) {
- FlowEvent.Start -> _ctx = conn
- FlowEvent.Converge -> _powerDraw = conn.rate
- FlowEvent.Exit -> _ctx = null
- else -> {}
- }
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ _powerDraw = conn.rate
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index b85be39d..cc4f1f6a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -24,7 +24,6 @@ package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
/**
@@ -41,29 +40,14 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
*/
public fun waitFor(consumer: FlowSource): FlowSource {
waiting.add(consumer)
- return object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return try {
- consumer.onPull(conn, now, delta)
- } catch (cause: Throwable) {
- complete(consumer)
- throw cause
- }
- }
-
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ return object : FlowSource by consumer {
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
try {
- consumer.onEvent(conn, now, event)
-
- if (event == FlowEvent.Exit) {
- complete(consumer)
- }
- } catch (cause: Throwable) {
+ consumer.onStop(conn, now, delta)
+ } finally {
complete(consumer)
- throw cause
}
}
-
override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]"
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
index df2c4fab..4685a755 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -83,20 +83,20 @@ public interface FlowConsumer {
public suspend fun FlowConsumer.consume(source: FlowSource) {
return suspendCancellableCoroutine { cont ->
startConsumer(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return try {
- source.onPull(conn, now, delta)
+ override fun onStart(conn: FlowConnection, now: Long) {
+ try {
+ source.onStart(conn, now)
} catch (cause: Throwable) {
cont.resumeWithException(cause)
throw cause
}
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
try {
- source.onEvent(conn, now, event)
+ source.onStop(conn, now, delta)
- if (event == FlowEvent.Exit && !cont.isCompleted) {
+ if (!cont.isCompleted) {
cont.resume(Unit)
}
} catch (cause: Throwable) {
@@ -105,6 +105,24 @@ public suspend fun FlowConsumer.consume(source: FlowSource) {
}
}
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return try {
+ source.onPull(conn, now, delta)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ source.onConverge(conn, now, delta)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
override fun toString(): String = "SuspendingFlowSource"
})
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
deleted file mode 100644
index bb6f25b1..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A flow event that is communicated to a [FlowSource].
- */
-public enum class FlowEvent {
- /**
- * This event is emitted to the source when it has started.
- */
- Start,
-
- /**
- * This event is emitted to the source when it is stopped.
- */
- Exit,
-
- /**
- * This event is emitted to the source when the system has converged into a steady state.
- */
- Converge,
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index bc01a11b..ab5b31c2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -24,6 +24,7 @@ package org.opendc.simulator.flow
import mu.KotlinLogging
import org.opendc.simulator.flow.internal.FlowCountersImpl
+import kotlin.math.max
/**
* A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
@@ -64,6 +65,8 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
_innerCtx?.pull()
}
+ @JvmField var lastPull = Long.MAX_VALUE
+
override fun push(rate: Double) {
if (delegate == null) {
return
@@ -82,7 +85,9 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
reset()
if (hasDelegateStarted) {
- delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit)
+ val now = engine.clock.millis()
+ val delta = max(0, now - lastPull)
+ delegate.onStop(this, now, delta)
}
}
}
@@ -134,6 +139,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
}
}
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _innerCtx = conn
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _innerCtx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+
+ try {
+ delegate.onStop(this._ctx, now, delta)
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+ }
+ }
+ }
+
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val delegate = delegate
@@ -141,10 +165,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
start()
}
+ _ctx.lastPull = now
updateCounters(conn, delta)
return try {
- delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE
+ delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
@@ -153,32 +178,14 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
}
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- when (event) {
- FlowEvent.Start -> _innerCtx = conn
- FlowEvent.Exit -> {
- _innerCtx = null
-
- val delegate = delegate
- if (delegate != null) {
- reset()
-
- try {
- delegate.onEvent(this._ctx, now, FlowEvent.Exit)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
- }
- }
- }
- else ->
- try {
- delegate?.onEvent(this._ctx, now, event)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
-
- _innerCtx = null
- reset()
- }
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate?.onConverge(this._ctx, now, delta)
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+
+ _innerCtx = null
+ reset()
}
}
@@ -189,7 +196,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
val delegate = delegate ?: return
try {
- delegate.onEvent(_ctx, engine.clock.millis(), FlowEvent.Start)
+ delegate.onStart(_ctx, engine.clock.millis())
hasDelegateStarted = true
} catch (cause: Throwable) {
logger.error(cause) { "Uncaught exception" }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index 70687b4f..a4f624ef 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -30,6 +30,23 @@ package org.opendc.simulator.flow
*/
public interface FlowSource {
/**
+ * This method is invoked when the source is started.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the provider finished.
+ */
+ public fun onStart(conn: FlowConnection, now: Long) {}
+
+ /**
+ * This method is invoked when the source is finished.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the source finished.
+ * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
+ */
+ public fun onStop(conn: FlowConnection, now: Long, delta: Long) {}
+
+ /**
* This method is invoked when the source is pulled.
*
* @param conn The connection between the source and consumer.
@@ -40,11 +57,11 @@ public interface FlowSource {
public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
/**
- * This method is invoked when an event has occurred.
+ * This method is invoked when the flow graph has converged into a steady-state system.
*
* @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the event is occurring.
- * @param event The event that has occurred.
+ * @param now The virtual timestamp in milliseconds at which the system converged.
+ * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {}
+ public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index a86ed6ea..c235b9ae 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -125,7 +125,7 @@ internal class FlowConsumerContextImpl(
override fun start() {
check(_state == State.Pending) { "Consumer is already started" }
engine.batch {
- source.onEvent(this, _clock.millis(), FlowEvent.Start)
+ source.onStart(this, _clock.millis())
_state = State.Active
pull()
}
@@ -140,8 +140,7 @@ internal class FlowConsumerContextImpl(
_state = State.Closed
if (!_isUpdateActive) {
val now = _clock.millis()
- val delta = max(0, now - _lastPull)
- doStopSource(now, delta)
+ doStopSource(now)
// FIX: Make sure the context converges
pull()
@@ -210,19 +209,16 @@ internal class FlowConsumerContextImpl(
_isUpdateActive = true
_isImmediateUpdateScheduled = false
- val lastPush = _lastPush
- val pushDelta = max(0, now - lastPush)
-
try {
// Pull the source if (1) `pull` is called or (2) the timer of the source has expired
val deadline = if (_isPulled || _deadline == now) {
val lastPull = _lastPull
- val pullDelta = max(0, now - lastPull)
+ val delta = max(0, now - lastPull)
_isPulled = false
_lastPull = now
- val duration = source.onPull(this, now, pullDelta)
+ val duration = source.onPull(this, now, delta)
if (duration != Long.MAX_VALUE)
now + duration
else
@@ -236,12 +232,15 @@ internal class FlowConsumerContextImpl(
State.Active -> {
val demand = _demand
if (demand != _activeDemand) {
+ val lastPush = _lastPush
+ val delta = max(0, now - lastPush)
+
_lastPush = now
- logic.onPush(this, now, pushDelta, demand)
+ logic.onPush(this, now, delta, demand)
}
}
- State.Closed -> doStopSource(now, pushDelta)
+ State.Closed -> doStopSource(now)
State.Pending -> throw IllegalStateException("Illegal transition to pending state")
}
@@ -256,7 +255,7 @@ internal class FlowConsumerContextImpl(
// Schedule an update at the new deadline
scheduleDelayed(now, deadline)
} catch (cause: Throwable) {
- doFailSource(now, pushDelta, cause)
+ doFailSource(now, cause)
} finally {
_isUpdateActive = false
}
@@ -293,12 +292,12 @@ internal class FlowConsumerContextImpl(
try {
if (_state == State.Active) {
- source.onEvent(this, timestamp, FlowEvent.Converge)
+ source.onConverge(this, timestamp, delta)
}
logic.onConverge(this, timestamp, delta)
} catch (cause: Throwable) {
- doFailSource(timestamp, max(0, timestamp - _lastPull), cause)
+ doFailSource(timestamp, cause)
}
}
@@ -307,12 +306,13 @@ internal class FlowConsumerContextImpl(
/**
* Stop the [FlowSource].
*/
- private fun doStopSource(now: Long, delta: Long) {
+ private fun doStopSource(now: Long) {
try {
- source.onEvent(this, now, FlowEvent.Exit)
- logic.onFinish(this, now, delta, null)
+ source.onStop(this, now, max(0, now - _lastPull))
+ doFinishConsumer(now, null)
} catch (cause: Throwable) {
- doFailSource(now, delta, cause)
+ doFinishConsumer(now, cause)
+ return
} finally {
_deadline = Long.MAX_VALUE
_demand = 0.0
@@ -322,9 +322,24 @@ internal class FlowConsumerContextImpl(
/**
* Fail the [FlowSource].
*/
- private fun doFailSource(now: Long, delta: Long, cause: Throwable) {
+ private fun doFailSource(now: Long, cause: Throwable) {
+ try {
+ source.onStop(this, now, max(0, now - _lastPull))
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ doFinishConsumer(now, e)
+ } finally {
+ _deadline = Long.MAX_VALUE
+ _demand = 0.0
+ }
+ }
+
+ /**
+ * Finish the consumer.
+ */
+ private fun doFinishConsumer(now: Long, cause: Throwable?) {
try {
- logic.onFinish(this, now, delta, cause)
+ logic.onFinish(this, now, max(0, now - _lastPush), cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
logger.error(e) { "Uncaught exception" }
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index 811d9460..6dd9dcfb 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -88,13 +88,10 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul
_availableOutputs += forwarder
output.startConsumer(object : FlowSource by forwarder {
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- if (event == FlowEvent.Exit) {
- // De-register the output after it has finished
- _outputs -= output
- }
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _outputs -= output
- forwarder.onEvent(conn, now, event)
+ forwarder.onStop(conn, now, delta)
}
})
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index b98cf2f1..c7379fa9 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -372,6 +372,19 @@ public class MaxMinFlowMultiplexer(
provider.cancel()
}
+ override fun onStart(conn: FlowConnection, now: Long) {
+ assert(_ctx == null) { "Source running concurrently" }
+ _ctx = conn
+ capacity = conn.capacity
+ updateCapacity()
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _ctx = null
+ capacity = 0.0
+ updateCapacity()
+ }
+
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val capacity = capacity
if (capacity != conn.capacity) {
@@ -383,23 +396,6 @@ public class MaxMinFlowMultiplexer(
return Long.MAX_VALUE
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- when (event) {
- FlowEvent.Start -> {
- assert(_ctx == null) { "Source running concurrently" }
- _ctx = conn
- capacity = conn.capacity
- updateCapacity()
- }
- FlowEvent.Exit -> {
- _ctx = null
- capacity = 0.0
- updateCapacity()
- }
- else -> {}
- }
- }
-
override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 24ae64cb..0c39523f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.flow.source
import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
/**
@@ -48,28 +47,22 @@ public class FlowSourceRateAdapter(
callback(0.0)
}
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return try {
- delegate.onPull(conn, now, delta)
- } catch (cause: Throwable) {
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate.onStop(conn, now, delta)
+ } finally {
rate = 0.0
- throw cause
}
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- try {
- delegate.onEvent(conn, now, event)
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
+ }
- when (event) {
- FlowEvent.Converge -> rate = conn.rate
- FlowEvent.Exit -> rate = 0.0
- else -> {}
- }
- } catch (cause: Throwable) {
- rate = 0.0
- throw cause
- }
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ delegate.onConverge(conn, now, delta)
+
+ rate = conn.rate
}
override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
index 4d3ae61a..ae537845 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.flow.source
import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
/**
@@ -33,6 +32,15 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource
private var _iterator: Iterator<Fragment>? = null
private var _nextTarget = Long.MIN_VALUE
+ override fun onStart(conn: FlowConnection, now: Long) {
+ check(_iterator == null) { "Source already running" }
+ _iterator = trace.iterator()
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _iterator = null
+ }
+
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
// Check whether the trace fragment was fully consumed, otherwise wait until we have done so
val nextTarget = _nextTarget
@@ -52,21 +60,8 @@ public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource
}
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- when (event) {
- FlowEvent.Start -> {
- check(_iterator == null) { "Source already running" }
- _iterator = trace.iterator()
- }
- FlowEvent.Exit -> {
- _iterator = null
- }
- else -> {}
- }
- }
-
/**
- * A fragment of the tgrace.
+ * A fragment of the trace.
*/
public data class Fragment(val duration: Long, val usage: Double)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index 7fae918a..3690e681 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -22,10 +22,10 @@
package org.opendc.simulator.flow
-import io.mockk.spyk
-import io.mockk.verify
+import io.mockk.*
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
@@ -122,7 +122,7 @@ internal class FlowForwarderTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ verify(exactly = 0) { consumer.onStop(any(), any(), any()) }
}
@Test
@@ -139,8 +139,8 @@ internal class FlowForwarderTest {
yield()
forwarder.cancel()
- verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
- verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ verify(exactly = 1) { consumer.onStart(any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
}
@Test
@@ -157,8 +157,8 @@ internal class FlowForwarderTest {
yield()
source.cancel()
- verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
- verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ verify(exactly = 1) { consumer.onStart(any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
}
@Test
@@ -182,22 +182,23 @@ internal class FlowForwarderTest {
}
@Test
+ @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368
fun testAdjustCapacity() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val forwarder = FlowForwarder(engine)
- val source = FlowSink(engine, 1.0)
+ val sink = FlowSink(engine, 1.0)
- val consumer = spyk(FixedFlowSource(2.0, 1.0))
- source.startConsumer(forwarder)
+ val source = spyk(FixedFlowSource(2.0, 1.0))
+ sink.startConsumer(forwarder)
coroutineScope {
- launch { forwarder.consume(consumer) }
+ launch { forwarder.consume(source) }
delay(1000)
- source.capacity = 0.5
+ sink.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 1) { source.onPull(any(), any(), any()) }
}
@Test
@@ -259,7 +260,7 @@ internal class FlowForwarderTest {
}
@Test
- fun testEventFailure() = runBlockingSimulation {
+ fun testStartFailure() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val forwarder = FlowForwarder(engine)
val source = FlowSink(engine, 2000.0)
@@ -272,7 +273,7 @@ internal class FlowForwarderTest {
return Long.MAX_VALUE
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ override fun onStart(conn: FlowConnection, now: Long) {
throw IllegalStateException("Test")
}
})
@@ -287,7 +288,7 @@ internal class FlowForwarderTest {
}
@Test
- fun testEventConvergeFailure() = runBlockingSimulation {
+ fun testConvergeFailure() = runBlockingSimulation {
val engine = FlowEngineImpl(coroutineContext, clock)
val forwarder = FlowForwarder(engine)
val source = FlowSink(engine, 2000.0)
@@ -300,10 +301,8 @@ internal class FlowForwarderTest {
return Long.MAX_VALUE
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- if (event == FlowEvent.Converge) {
- throw IllegalStateException("Test")
- }
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ throw IllegalStateException("Test")
}
})
} catch (cause: Throwable) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
index 5d579e5d..70c75864 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.flow
-import io.mockk.every
-import io.mockk.mockk
import io.mockk.spyk
import io.mockk.verify
import kotlinx.coroutines.*
@@ -67,7 +65,7 @@ internal class FlowSinkTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
+ verify(exactly = 3) { consumer.onPull(any(), any(), any()) }
}
@Test
@@ -102,7 +100,7 @@ internal class FlowSinkTest {
return Long.MAX_VALUE
}
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ override fun onStart(conn: FlowConnection, now: Long) {
conn.pull()
}
}
@@ -120,11 +118,8 @@ internal class FlowSinkTest {
val consumer = object : FlowSource {
var isFirst = true
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- when (event) {
- FlowEvent.Start -> resCtx = conn
- else -> {}
- }
+ override fun onStart(conn: FlowConnection, now: Long) {
+ resCtx = conn
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
@@ -154,9 +149,15 @@ internal class FlowSinkTest {
val capacity = 4200.0
val provider = FlowSink(engine, capacity)
- val consumer = mockk<FlowSource>(relaxUnitFun = true)
- every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) }
- .throws(IllegalStateException())
+ val consumer = object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ throw IllegalStateException("Hi")
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+ }
assertThrows<IllegalStateException> {
provider.consume(consumer)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
index c8627446..3475f027 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
@@ -108,11 +108,8 @@ internal class ExclusiveFlowMultiplexerTest {
val workload = object : FlowSource {
var isFirst = true
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- when (event) {
- FlowEvent.Start -> isFirst = true
- else -> {}
- }
+ override fun onStart(conn: FlowConnection, now: Long) {
+ isFirst = true
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
index 45d0bcf0..14d22162 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSinkTest.kt
@@ -108,7 +108,7 @@ class SimNetworkSinkTest {
assertTrue(source.isConnected)
verify { source.createConsumer() }
- verify { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify { consumer.onStart(any(), any()) }
}
@Test
@@ -124,7 +124,7 @@ class SimNetworkSinkTest {
assertFalse(sink.isConnected)
assertFalse(source.isConnected)
- verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ verify { consumer.onStop(any(), any(), any()) }
}
private class Source(engine: FlowEngine) : SimNetworkPort() {
diff --git a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
index 4aa2fa92..62e54ffb 100644
--- a/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
+++ b/opendc-simulator/opendc-simulator-network/src/test/kotlin/org/opendc/simulator/network/SimNetworkSwitchVirtualTest.kt
@@ -50,7 +50,7 @@ class SimNetworkSwitchVirtualTest {
assertTrue(source.isConnected)
verify { source.createConsumer() }
- verify { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify { consumer.onStart(any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
index ff447703..85b9ab01 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPduTest.kt
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
import org.opendc.simulator.flow.source.FixedFlowSource
@@ -87,7 +86,7 @@ internal class SimPduTest {
outlet.connect(inlet)
outlet.disconnect()
- verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ verify { consumer.onStop(any(), any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
index b411e292..20677633 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimPowerSourceTest.kt
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
import org.opendc.simulator.flow.source.FixedFlowSource
@@ -86,7 +85,7 @@ internal class SimPowerSourceTest {
source.connect(inlet)
source.disconnect()
- verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ verify { consumer.onStop(any(), any(), any()) }
}
@Test
diff --git a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
index 31ac0b39..c6e0605a 100644
--- a/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
+++ b/opendc-simulator/opendc-simulator-power/src/test/kotlin/org/opendc/simulator/power/SimUpsTest.kt
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowEvent
import org.opendc.simulator.flow.FlowSource
import org.opendc.simulator.flow.source.FixedFlowSource
@@ -93,7 +92,7 @@ internal class SimUpsTest {
ups.connect(inlet)
ups.disconnect()
- verify { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ verify { consumer.onStop(any(), any(), any()) }
}
class SimpleInlet : SimPowerInlet() {