summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt45
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt52
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt35
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt5
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt34
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt48
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt36
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt32
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt19
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt10
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt20
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt12
16 files changed, 239 insertions, 153 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index f4459c54..1bcaf45f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -96,10 +96,8 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
override fun onIdle(deadline: Long) = doIdle(deadline)
- override fun onFinish(cause: Throwable?) {
- doFinish(cause)
-
- super.onFinish(cause)
+ override fun onFinish() {
+ doFinish(null)
}
}
@@ -134,6 +132,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
*/
private var command: SimResourceCommand? = null
+ private fun updateCapacity() {
+ // Adjust capacity of output resource
+ context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 }
+ }
+
/* Input */
override fun push(command: SimResourceCommand) {
this.command = command
@@ -141,18 +144,6 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
}
/* SimResourceConsumer */
- override fun onStart(ctx: SimResourceContext) {
- _ctx = ctx
- onCapacityChanged(ctx, false)
-
- // Make sure we initialize the output if we have not done so yet
- if (context.state == SimResourceState.Pending) {
- context.start()
- }
-
- onInputStarted(this)
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
var next = command
@@ -167,13 +158,23 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
}
}
- override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
- // Adjust capacity of output resource
- context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 }
- }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ _ctx = ctx
+ updateCapacity()
+
+ // Make sure we initialize the output if we have not done so yet
+ if (context.state == SimResourceState.Pending) {
+ context.start()
+ }
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- onInputFinished(this)
+ onInputStarted(this)
+ }
+ SimResourceEvent.Capacity -> updateCapacity()
+ SimResourceEvent.Exit -> onInputFinished(this)
+ else -> {}
+ }
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 05ed0714..d2f585b1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -75,7 +75,7 @@ public abstract class SimAbstractResourceContext(
/**
* The current processing speed of the resource.
*/
- public var speed: Double = 0.0
+ final override var speed: Double = 0.0
private set
/**
@@ -92,9 +92,7 @@ public abstract class SimAbstractResourceContext(
/**
* This method is invoked when the resource consumer has finished.
*/
- public open fun onFinish(cause: Throwable?) {
- consumer.onFinish(this, cause)
- }
+ public abstract fun onFinish()
/**
* Get the remaining work to process after a resource consumption.
@@ -126,10 +124,10 @@ public abstract class SimAbstractResourceContext(
latestFlush = now
try {
- consumer.onStart(this)
+ consumer.onEvent(this, SimResourceEvent.Start)
activeCommand = interpret(consumer.onNext(this), now)
} catch (cause: Throwable) {
- doStop(cause)
+ doFail(cause)
} finally {
isProcessing = false
}
@@ -144,9 +142,9 @@ public abstract class SimAbstractResourceContext(
latestFlush = clock.millis()
flush(isIntermediate = true)
- doStop(null)
+ doStop()
} catch (cause: Throwable) {
- doStop(cause)
+ doFail(cause)
} finally {
isProcessing = false
}
@@ -214,7 +212,7 @@ public abstract class SimAbstractResourceContext(
// Flush remaining work cache
_remainingWorkFlush = Long.MIN_VALUE
} catch (cause: Throwable) {
- doStop(cause)
+ doFail(cause)
} finally {
latestFlush = now
isProcessing = false
@@ -251,13 +249,18 @@ public abstract class SimAbstractResourceContext(
/**
* Finish the consumer and resource provider.
*/
- private fun doStop(cause: Throwable?) {
+ private fun doStop() {
val state = state
this.state = SimResourceState.Stopped
if (state == SimResourceState.Active) {
activeCommand = null
- onFinish(cause)
+ try {
+ consumer.onEvent(this, SimResourceEvent.Exit)
+ onFinish()
+ } catch (cause: Throwable) {
+ doFail(cause)
+ }
}
}
@@ -272,9 +275,9 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = 0.0
- consumer.onConfirm(this, 0.0)
onIdle(deadline)
+ consumer.onEvent(this, SimResourceEvent.Run)
}
is SimResourceCommand.Consume -> {
val work = command.work
@@ -284,14 +287,13 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = min(capacity, limit)
- consumer.onConfirm(this, speed)
-
onConsume(work, limit, deadline)
+ consumer.onEvent(this, SimResourceEvent.Run)
}
is SimResourceCommand.Exit -> {
speed = 0.0
- doStop(null)
+ doStop()
// No need to set the next active command
return null
@@ -319,6 +321,23 @@ public abstract class SimAbstractResourceContext(
}
/**
+ * Fail the resource consumer.
+ */
+ private fun doFail(cause: Throwable) {
+ state = SimResourceState.Stopped
+ activeCommand = null
+
+ try {
+ consumer.onFailure(this, cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ e.printStackTrace()
+ }
+
+ onFinish()
+ }
+
+ /**
* Indicate that the capacity of the resource has changed.
*/
private fun onCapacityChange() {
@@ -328,7 +347,8 @@ public abstract class SimAbstractResourceContext(
}
val isThrottled = speed > capacity
- consumer.onCapacityChanged(this, isThrottled)
+
+ consumer.onEvent(this, SimResourceEvent.Capacity)
// Optimization: only flush changes if the new capacity cannot satisfy the active resource command.
// Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush().
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 38672b13..4d937514 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -30,13 +30,6 @@ package org.opendc.simulator.resources
*/
public interface SimResourceConsumer {
/**
- * This method is invoked when the consumer is started for some resource.
- *
- * @param ctx The execution context in which the consumer runs.
- */
- public fun onStart(ctx: SimResourceContext) {}
-
- /**
* This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
* the resource finished processing, reached its deadline or was interrupted.
*
@@ -46,34 +39,18 @@ public interface SimResourceConsumer {
public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
- * This method is invoked when the resource provider confirms that the consumer is running at the given speed.
+ * This method is invoked when an event has occurred.
*
* @param ctx The execution context in which the consumer runs.
- * @param speed The speed at which the consumer runs.
+ * @param event The event that has occurred.
*/
- public fun onConfirm(ctx: SimResourceContext, speed: Double) {}
+ public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {}
/**
- * This is method is invoked when the capacity of the resource changes.
- *
- * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
- * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly
- * causing the active resource command to finish at a later moment than initially planned.
+ * This method is invoked when a resource consumer throws an exception.
*
* @param ctx The execution context in which the consumer runs.
- * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the
- * capacity change.
- */
- public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {}
-
- /**
- * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit],
- * the resource finished itself, or a failure occurred at the resource.
- *
- * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider.
- *
- * @param ctx The execution context in which the consumer ran.
- * @param cause The cause of the finish in case the resource finished exceptionally.
+ * @param cause The cause of the failure.
*/
- public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {}
+ public fun onFailure(ctx: SimResourceContext, cause: Throwable) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index 11dbb09f..7c76c634 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -40,6 +40,11 @@ public interface SimResourceContext {
public val capacity: Double
/**
+ * The resource processing speed at this instant.
+ */
+ public val speed: Double
+
+ /**
* The amount of work still remaining at this instant.
*/
public val remainingWork: Double
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index dfdd2c2e..8128c98b 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -90,26 +90,28 @@ public class SimResourceDistributorMaxMin(
val remainingWork: Double
get() = ctx.remainingWork
- override fun onStart(ctx: SimResourceContext) {
- this.ctx = ctx
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
return doNext(ctx.capacity)
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- super.onFinish(ctx, cause)
-
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ }
+ SimResourceEvent.Exit -> {
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
- output.close()
+ output.close()
+ }
+ }
+ else -> {}
}
}
}
@@ -370,13 +372,11 @@ public class SimResourceDistributorMaxMin(
activeCommand = SimResourceCommand.Consume(work, limit, deadline)
}
- override fun onFinish(cause: Throwable?) {
+ override fun onFinish() {
reportOvercommit()
activeCommand = SimResourceCommand.Exit
provider.cancel()
-
- super.onFinish(cause)
}
override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt
new file mode 100644
index 00000000..959427f1
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.resources
+
+/**
+ * A resource event that is communicated to the resource consumer.
+ */
+public enum class SimResourceEvent {
+ /**
+ * This event is emitted to the consumer when it has started.
+ */
+ Start,
+
+ /**
+ * This event is emitted to the consumer when it has exited.
+ */
+ Exit,
+
+ /**
+ * This event is emitted to the consumer when it has started a new resource consumption or idle cycle.
+ */
+ Run,
+
+ /**
+ * This event is emitted to the consumer when the capacity of the resource has changed.
+ */
+ Capacity,
+}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
index 52b13c5c..2f567a5e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt
@@ -23,6 +23,8 @@
package org.opendc.simulator.resources
import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
/**
* A [SimResourceProvider] provides some resource of type [R].
@@ -65,15 +67,27 @@ public interface SimResourceProvider : AutoCloseable {
public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) {
return suspendCancellableCoroutine { cont ->
startConsumer(object : SimResourceConsumer by consumer {
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- assert(!cont.isCompleted) { "Coroutine already completed" }
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ consumer.onEvent(ctx, event)
- consumer.onFinish(ctx, cause)
+ if (event == SimResourceEvent.Exit && !cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ }
- cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit))
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
+ try {
+ consumer.onFailure(ctx, cause)
+ cont.resumeWithException(cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ cont.resumeWithException(e)
+ }
}
override fun toString(): String = "SimSuspendingResourceConsumer"
})
+
+ cont.invokeOnCancellation { cancel() }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 157db3cb..fe569096 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -110,7 +110,7 @@ public class SimResourceSource(
scheduler.startSingleTimerTo(this, until, ::flush)
}
- override fun onFinish(cause: Throwable?) {
+ override fun onFinish() {
scheduler.cancel(this)
ctx = null
@@ -118,8 +118,6 @@ public class SimResourceSource(
if (this@SimResourceSource.state != SimResourceState.Stopped) {
this@SimResourceSource.state = SimResourceState.Pending
}
-
- super.onFinish(cause)
}
override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]"
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
index 45e4c220..1a9dd0bc 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt
@@ -66,10 +66,13 @@ public class SimResourceSwitchExclusive : SimResourceSwitch {
availableResources += forwarder
input.startConsumer(object : SimResourceConsumer by forwarder {
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- // De-register the input after it has finished
- _inputs -= input
- forwarder.onFinish(ctx, cause)
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ if (event == SimResourceEvent.Exit) {
+ // De-register the input after it has finished
+ _inputs -= input
+ }
+
+ forwarder.onEvent(ctx, event)
}
})
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index de455021..32f3f573 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -75,7 +75,7 @@ public class SimResourceTransformer(
if (delegate != null && ctx != null) {
this.delegate = null
- delegate.onFinish(ctx)
+ delegate.onEvent(ctx, SimResourceEvent.Exit)
}
}
@@ -90,10 +90,6 @@ public class SimResourceTransformer(
}
}
- override fun onStart(ctx: SimResourceContext) {
- this.ctx = ctx
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val delegate = delegate
@@ -110,7 +106,7 @@ public class SimResourceTransformer(
// reset beforehand the existing state and check whether it has been updated afterwards
reset()
- delegate.onFinish(ctx)
+ delegate.onEvent(ctx, SimResourceEvent.Exit)
if (isCoupled || state == SimResourceState.Stopped)
SimResourceCommand.Exit
@@ -124,21 +120,31 @@ public class SimResourceTransformer(
}
}
- override fun onConfirm(ctx: SimResourceContext, speed: Double) {
- delegate?.onConfirm(ctx, speed)
- }
-
- override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
- delegate?.onCapacityChanged(ctx, isThrottled)
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ }
+ SimResourceEvent.Exit -> {
+ this.ctx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+ delegate.onEvent(ctx, SimResourceEvent.Exit)
+ }
+ }
+ else -> delegate?.onEvent(ctx, event)
+ }
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
this.ctx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onFinish(ctx, cause)
+ delegate.onFailure(ctx, cause)
}
}
@@ -147,7 +153,7 @@ public class SimResourceTransformer(
*/
private fun start() {
val delegate = delegate ?: return
- delegate.onStart(checkNotNull(ctx))
+ delegate.onEvent(checkNotNull(ctx), SimResourceEvent.Start)
hasDelegateStarted = true
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index 114c7312..4f4ebb14 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.SimResourceEvent
import kotlin.math.min
/**
@@ -53,28 +54,29 @@ public class SimSpeedConsumerAdapter(
return delegate.onNext(ctx)
}
- override fun onConfirm(ctx: SimResourceContext, speed: Double) {
- delegate.onConfirm(ctx, speed)
-
- this.speed = speed
- }
-
- override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
val oldSpeed = speed
- delegate.onCapacityChanged(ctx, isThrottled)
+ delegate.onEvent(ctx, event)
- // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
- // need to update the current speed.
- if (oldSpeed == speed) {
- speed = min(ctx.capacity, speed)
+ when (event) {
+ SimResourceEvent.Run -> speed = ctx.speed
+ SimResourceEvent.Capacity -> {
+ // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
+ // need to update the current speed.
+ if (oldSpeed == speed) {
+ speed = min(ctx.capacity, speed)
+ }
+ }
+ SimResourceEvent.Exit -> speed = 0.0
+ else -> {}
}
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- super.onFinish(ctx, cause)
-
+ override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
speed = 0.0
+
+ delegate.onFailure(ctx, cause)
}
override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index a52d1d5d..2e94e1c1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
+import org.opendc.simulator.resources.SimResourceEvent
/**
* A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -33,11 +34,6 @@ import org.opendc.simulator.resources.SimResourceContext
public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
private var iterator: Iterator<Fragment>? = null
- override fun onStart(ctx: SimResourceContext) {
- check(iterator == null) { "Consumer already running" }
- iterator = trace.iterator()
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
@@ -57,8 +53,17 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
}
}
- override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- iterator = null
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ check(iterator == null) { "Consumer already running" }
+ iterator = trace.iterator()
+ }
+ SimResourceEvent.Exit -> {
+ iterator = null
+ }
+ else -> {}
+ }
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index be909556..8c15ec71 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -40,7 +40,7 @@ class SimResourceContextTest {
val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
}
context.flush()
@@ -53,7 +53,7 @@ class SimResourceContextTest {
val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
})
@@ -71,7 +71,7 @@ class SimResourceContextTest {
val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
})
@@ -83,7 +83,7 @@ class SimResourceContextTest {
assertAll(
{ verify(exactly = 2) { context.onIdle(any()) } },
- { verify(exactly = 1) { context.onFinish(null) } }
+ { verify(exactly = 1) { context.onFinish() } }
)
}
@@ -94,7 +94,7 @@ class SimResourceContextTest {
val context = object : SimAbstractResourceContext(4200.0, clock, consumer) {
override fun onIdle(deadline: Long) {}
- override fun onFinish(cause: Throwable?) {}
+ override fun onFinish() {}
override fun onConsume(work: Double, limit: Double, deadline: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
index 39f74481..361a1516 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt
@@ -77,7 +77,7 @@ class SimResourceSourceTest {
provider.capacity = 0.5
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
} finally {
scheduler.close()
provider.close()
@@ -119,13 +119,13 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, clock, scheduler)
val consumer = object : SimResourceConsumer {
- override fun onStart(ctx: SimResourceContext) {
- ctx.interrupt()
- }
-
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
return SimResourceCommand.Exit
}
+
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ ctx.interrupt()
+ }
}
try {
@@ -145,8 +145,12 @@ class SimResourceSourceTest {
val consumer = object : SimResourceConsumer {
var isFirst = true
- override fun onStart(ctx: SimResourceContext) {
- resCtx = ctx
+
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> resCtx = ctx
+ else -> {}
+ }
}
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
@@ -181,7 +185,7 @@ class SimResourceSourceTest {
val provider = SimResourceSource(capacity, clock, scheduler)
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
- every { consumer.onStart(any()) }
+ every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) }
.throws(IllegalStateException())
try {
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
index f7d17867..1b1f7790 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt
@@ -120,8 +120,11 @@ internal class SimResourceSwitchExclusiveTest {
val workload = object : SimResourceConsumer {
var isFirst = true
- override fun onStart(ctx: SimResourceContext) {
- isFirst = true
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> isFirst = true
+ else -> {}
+ }
}
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index d2ad73bc..e3ca5845 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -118,7 +118,7 @@ internal class SimResourceTransformerTest {
forwarder.startConsumer(consumer)
forwarder.cancel()
- verify(exactly = 0) { consumer.onFinish(any(), null) }
+ verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) }
}
@Test
@@ -136,8 +136,8 @@ internal class SimResourceTransformerTest {
yield()
forwarder.cancel()
- verify(exactly = 1) { consumer.onStart(any()) }
- verify(exactly = 1) { consumer.onFinish(any(), null) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
}
@Test
@@ -155,8 +155,8 @@ internal class SimResourceTransformerTest {
yield()
source.cancel()
- verify(exactly = 1) { consumer.onStart(any()) }
- verify(exactly = 1) { consumer.onFinish(any(), null) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) }
}
@Test
@@ -191,7 +191,7 @@ internal class SimResourceTransformerTest {
}
assertEquals(3000, clock.millis())
- verify(exactly = 1) { consumer.onCapacityChanged(any(), true) }
+ verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) }
}
@Test