summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-30 14:35:30 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:40 +0200
commitb0fc93f818e5e735e972a04f5aa49e0ebe1de181 (patch)
tree354e285fec462cb61b7f36c48b4d377b0a387b8d
parent7b3a31b11df76870b965748fd8f7e712682a9d30 (diff)
refactor(simulator): Remove failure callback from FlowSource
This change removes the `onFailure` method from FlowSource. Instead, the FlowConsumer will receive the reason for failure of the source.
-rw-r--r--opendc-simulator/opendc-simulator-compute/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt22
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt5
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt85
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt32
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt44
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt31
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt96
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt)2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt)2
-rw-r--r--opendc-simulator/opendc-simulator-network/build.gradle.kts2
-rw-r--r--opendc-simulator/opendc-simulator-power/build.gradle.kts2
17 files changed, 232 insertions, 134 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index e2290a14..a2bb89c2 100644
--- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -36,4 +36,6 @@ dependencies {
api(projects.opendcSimulator.opendcSimulatorNetwork)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcUtils)
+
+ testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
index dabe60e0..b85be39d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt
@@ -41,22 +41,26 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) {
*/
public fun waitFor(consumer: FlowSource): FlowSource {
waiting.add(consumer)
- return object : FlowSource by consumer {
+ return object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return try {
+ consumer.onPull(conn, now, delta)
+ } catch (cause: Throwable) {
+ complete(consumer)
+ throw cause
+ }
+ }
+
override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
try {
consumer.onEvent(conn, now, event)
- } finally {
+
if (event == FlowEvent.Exit) {
complete(consumer)
}
- }
- }
-
- override fun onFailure(conn: FlowConnection, cause: Throwable) {
- try {
- consumer.onFailure(conn, cause)
- } finally {
+ } catch (cause: Throwable) {
complete(consumer)
+ throw cause
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
index 5a956fee..05e21c3c 100644
--- a/opendc-simulator/opendc-simulator-flow/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -32,7 +32,8 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
api(libs.kotlinx.coroutines)
- implementation(projects.opendcUtils)
+ implementation(libs.kotlin.logging)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
+ testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
index 3a6e2e97..df2c4fab 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -82,22 +82,26 @@ public interface FlowConsumer {
*/
public suspend fun FlowConsumer.consume(source: FlowSource) {
return suspendCancellableCoroutine { cont ->
- startConsumer(object : FlowSource by source {
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- source.onEvent(conn, now, event)
-
- if (event == FlowEvent.Exit && !cont.isCompleted) {
- cont.resume(Unit)
+ startConsumer(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return try {
+ source.onPull(conn, now, delta)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
}
}
- override fun onFailure(conn: FlowConnection, cause: Throwable) {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
try {
- source.onFailure(conn, cause)
+ source.onEvent(conn, now, event)
+
+ if (event == FlowEvent.Exit && !cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ } catch (cause: Throwable) {
cont.resumeWithException(cause)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- cont.resumeWithException(e)
+ throw cause
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
index c69cb17e..ef94ab22 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -46,11 +46,12 @@ public interface FlowConsumerLogic {
public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
/**
- * This method is invoked when the [FlowSource] is completed.
+ * This method is invoked when the [FlowSource] completed or failed.
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the provider finished.
* @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
+ * @param cause The cause of the failure or `null` if the source completed.
*/
- public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {}
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 2074033e..bc01a11b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -22,6 +22,7 @@
package org.opendc.simulator.flow
+import mu.KotlinLogging
import org.opendc.simulator.flow.internal.FlowCountersImpl
/**
@@ -32,6 +33,11 @@ import org.opendc.simulator.flow.internal.FlowCountersImpl
*/
public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable {
/**
+ * The logging instance of this connection.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The delegate [FlowSource].
*/
private var delegate: FlowSource? = null
@@ -59,23 +65,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
}
override fun push(rate: Double) {
+ if (delegate == null) {
+ return
+ }
+
_innerCtx?.push(rate)
_demand = rate
}
override fun close() {
- val delegate = checkNotNull(delegate) { "Delegate not active" }
-
- if (isCoupled)
- _innerCtx?.close()
- else
- _innerCtx?.push(0.0)
+ val delegate = delegate ?: return
+ val hasDelegateStarted = hasDelegateStarted
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
// reset beforehand the existing state and check whether it has been updated afterwards
reset()
- delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit)
+ if (hasDelegateStarted) {
+ delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit)
+ }
}
}
@@ -114,16 +122,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
}
override fun cancel() {
- val delegate = delegate
- val ctx = _innerCtx
-
- if (delegate != null) {
- this.delegate = null
-
- if (ctx != null) {
- delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit)
- }
- }
+ _ctx.close()
}
override fun close() {
@@ -144,34 +143,42 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
updateCounters(conn, delta)
- return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE
+ return try {
+ delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+
+ reset()
+ Long.MAX_VALUE
+ }
}
override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
when (event) {
- FlowEvent.Start -> {
- _innerCtx = conn
- }
+ FlowEvent.Start -> _innerCtx = conn
FlowEvent.Exit -> {
_innerCtx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onEvent(this._ctx, now, FlowEvent.Exit)
+
+ try {
+ delegate.onEvent(this._ctx, now, FlowEvent.Exit)
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+ }
}
}
- else -> delegate?.onEvent(this._ctx, now, event)
- }
- }
-
- override fun onFailure(conn: FlowConnection, cause: Throwable) {
- _innerCtx = null
+ else ->
+ try {
+ delegate?.onEvent(this._ctx, now, event)
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
- val delegate = delegate
- if (delegate != null) {
- reset()
- delegate.onFailure(this._ctx, cause)
+ _innerCtx = null
+ reset()
+ }
}
}
@@ -180,15 +187,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
*/
private fun start() {
val delegate = delegate ?: return
- delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start)
- hasDelegateStarted = true
+ try {
+ delegate.onEvent(_ctx, engine.clock.millis(), FlowEvent.Start)
+ hasDelegateStarted = true
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+ reset()
+ }
}
/**
* Reset the delegate.
*/
private fun reset() {
+ if (isCoupled)
+ _innerCtx?.close()
+ else
+ _innerCtx?.push(0.0)
+
delegate = null
hasDelegateStarted = false
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index fb6ca85d..fc590177 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -46,7 +46,7 @@ public class FlowSink(
updateCounters(ctx, delta)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
updateCounters(ctx, delta)
cancel()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
index 077b4d38..70687b4f 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -47,12 +47,4 @@ public interface FlowSource {
* @param event The event that has occurred.
*/
public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {}
-
- /**
- * This method is invoked when the source throws an exception.
- *
- * @param conn The connection between the source and consumer.
- * @param cause The cause of the failure.
- */
- public fun onFailure(conn: FlowConnection, cause: Throwable) {}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index fc9c8059..a74f89b4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -22,6 +22,7 @@
package org.opendc.simulator.flow.internal
+import mu.KotlinLogging
import org.opendc.simulator.flow.*
import java.util.ArrayDeque
import kotlin.math.max
@@ -36,6 +37,11 @@ internal class FlowConsumerContextImpl(
private val logic: FlowConsumerLogic
) : FlowConsumerContext {
/**
+ * The logging instance of this connection.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* The capacity of the connection.
*/
override var capacity: Double = 0.0
@@ -131,7 +137,7 @@ internal class FlowConsumerContextImpl(
if (!_isUpdateActive) {
val now = _clock.millis()
val delta = max(0, now - _lastPull)
- doStop(now, delta)
+ doStopSource(now, delta)
// FIX: Make sure the context converges
pull()
@@ -231,7 +237,7 @@ internal class FlowConsumerContextImpl(
logic.onPush(this, now, pushDelta, demand)
}
}
- State.Closed -> doStop(now, pushDelta)
+ State.Closed -> doStopSource(now, pushDelta)
State.Pending -> throw IllegalStateException("Illegal transition to pending state")
}
@@ -246,7 +252,7 @@ internal class FlowConsumerContextImpl(
// Schedule an update at the new deadline
scheduleDelayed(now, deadline)
} catch (cause: Throwable) {
- doFail(now, pushDelta, cause)
+ doFailSource(now, pushDelta, cause)
} finally {
_isUpdateActive = false
}
@@ -288,21 +294,21 @@ internal class FlowConsumerContextImpl(
logic.onConverge(this, timestamp, delta)
} catch (cause: Throwable) {
- doFail(timestamp, max(0, timestamp - _lastPull), cause)
+ doFailSource(timestamp, max(0, timestamp - _lastPull), cause)
}
}
override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
/**
- * Stop the resource context.
+ * Stop the [FlowSource].
*/
- private fun doStop(now: Long, delta: Long) {
+ private fun doStopSource(now: Long, delta: Long) {
try {
source.onEvent(this, now, FlowEvent.Exit)
- logic.onFinish(this, now, delta)
+ logic.onFinish(this, now, delta, null)
} catch (cause: Throwable) {
- doFail(now, delta, cause)
+ doFailSource(now, delta, cause)
} finally {
_deadline = Long.MAX_VALUE
_demand = 0.0
@@ -310,17 +316,15 @@ internal class FlowConsumerContextImpl(
}
/**
- * Fail the resource consumer.
+ * Fail the [FlowSource].
*/
- private fun doFail(now: Long, delta: Long, cause: Throwable) {
+ private fun doFailSource(now: Long, delta: Long, cause: Throwable) {
try {
- source.onFailure(this, cause)
+ logic.onFinish(this, now, delta, cause)
} catch (e: Throwable) {
e.addSuppressed(cause)
- e.printStackTrace()
+ logger.error(e) { "Uncaught exception" }
}
-
- logic.onFinish(this, now, delta)
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 9735f121..a3e108f6 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -292,7 +292,7 @@ public class MaxMinFlowMultiplexer(
parent?.onConverge(now)
}
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
doUpdateCounters(delta)
limit = 0.0
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
index 7fcc0405..fcee3906 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -28,7 +28,7 @@ import org.opendc.simulator.flow.FlowSource
import kotlin.math.min
/**
- * Helper class to expose an observable [speed] field describing the speed of the consumer.
+ * Helper class to expose an observable [rate] field describing the flow rate of the source.
*/
public class FlowSourceRateAdapter(
private val delegate: FlowSource,
@@ -37,7 +37,7 @@ public class FlowSourceRateAdapter(
/**
* The resource processing speed at this instant.
*/
- public var speed: Double = 0.0
+ public var rate: Double = 0.0
private set(value) {
if (field != value) {
callback(value)
@@ -50,33 +50,37 @@ public class FlowSourceRateAdapter(
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- return delegate.onPull(conn, now, delta)
+ return try {
+ delegate.onPull(conn, now, delta)
+ } catch (cause: Throwable) {
+ rate = 0.0
+ throw cause
+ }
}
override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- val oldSpeed = speed
+ val oldSpeed = rate
- delegate.onEvent(conn, now, event)
+ try {
+ delegate.onEvent(conn, now, event)
- when (event) {
- FlowEvent.Converge -> speed = conn.rate
- FlowEvent.Capacity -> {
- // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
- // need to update the current speed.
- if (oldSpeed == speed) {
- speed = min(conn.capacity, speed)
+ when (event) {
+ FlowEvent.Converge -> rate = conn.rate
+ FlowEvent.Capacity -> {
+ // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
+ // need to update the current speed.
+ if (oldSpeed == rate) {
+ rate = min(conn.capacity, rate)
+ }
}
+ FlowEvent.Exit -> rate = 0.0
+ else -> {}
}
- FlowEvent.Exit -> speed = 0.0
- else -> {}
+ } catch (cause: Throwable) {
+ rate = 0.0
+ throw cause
}
}
- override fun onFailure(conn: FlowConnection, cause: Throwable) {
- speed = 0.0
-
- delegate.onFailure(conn, cause)
- }
-
override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
index 380fd38a..f1a5cbe4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -23,7 +23,6 @@
package org.opendc.simulator.flow
import io.mockk.*
-import kotlinx.coroutines.*
import org.junit.jupiter.api.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
@@ -102,34 +101,4 @@ class FlowConsumerContextTest {
verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
}
-
- @Test
- fun testFailureNoInfiniteLoop() = runBlockingSimulation {
- val engine = FlowEngineImpl(coroutineContext, clock)
-
- val consumer = spyk(object : FlowSource {
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- conn.close()
- return Long.MAX_VALUE
- }
-
- override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
- if (event == FlowEvent.Exit) throw IllegalStateException("onEvent")
- }
-
- override fun onFailure(conn: FlowConnection, cause: Throwable) {
- throw IllegalStateException("onFailure")
- }
- })
-
- val logic = object : FlowConsumerLogic {}
-
- val context = FlowConsumerContextImpl(engine, consumer, logic)
-
- context.start()
-
- delay(1)
-
- verify(exactly = 1) { consumer.onFailure(any(), any()) }
- }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
index cbc48a4e..d125c638 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -219,4 +219,100 @@ internal class FlowForwarderTest {
assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
assertEquals(2000, clock.millis())
}
+
+ @Test
+ fun testCoupledExit() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(FixedFlowSource(2000.0, 1.0))
+
+ yield()
+
+ assertFalse(source.isActive)
+ }
+
+ @Test
+ fun testPullFailureCoupled() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertFalse(source.isActive)
+ }
+
+ @Test
+ fun testEventFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertTrue(source.isActive)
+ source.cancel()
+ }
+
+ @Test
+ fun testEventConvergeFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ if (event == FlowEvent.Converge) {
+ throw IllegalStateException("Test")
+ }
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertTrue(source.isActive)
+ source.cancel()
+ }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
index b503087e..c8627446 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt
@@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource
/**
* Test suite for the [ForwardingFlowMultiplexer] class.
*/
-internal class SimResourceSwitchExclusiveTest {
+internal class ExclusiveFlowMultiplexerTest {
/**
* Test a trace workload.
*/
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
index 089a8d78..9f6b8a2c 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
@@ -37,7 +37,7 @@ import org.opendc.simulator.flow.source.TraceFlowSource
/**
* Test suite for the [FlowMultiplexer] implementations
*/
-internal class SimResourceSwitchMaxMinTest {
+internal class MaxMinFlowMultiplexerTest {
@Test
fun testSmoke() = runBlockingSimulation {
val scheduler = FlowEngineImpl(coroutineContext, clock)
diff --git a/opendc-simulator/opendc-simulator-network/build.gradle.kts b/opendc-simulator/opendc-simulator-network/build.gradle.kts
index a8f94602..f8931053 100644
--- a/opendc-simulator/opendc-simulator-network/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-network/build.gradle.kts
@@ -32,4 +32,6 @@ dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
+
+ testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-simulator/opendc-simulator-power/build.gradle.kts b/opendc-simulator/opendc-simulator-power/build.gradle.kts
index e4342a6a..5d8c8949 100644
--- a/opendc-simulator/opendc-simulator-power/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-power/build.gradle.kts
@@ -32,4 +32,6 @@ dependencies {
api(platform(projects.opendcPlatform))
api(projects.opendcSimulator.opendcSimulatorFlow)
implementation(projects.opendcSimulator.opendcSimulatorCore)
+
+ testImplementation(libs.slf4j.simple)
}