diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-22 17:18:59 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-05-03 21:17:48 +0200 |
| commit | 980b016452b3889585feaf2dbbe3244c921123b0 (patch) | |
| tree | 845a418358c656f2917f7cc9127b3141d1ce48b5 /opendc-simulator/opendc-simulator-resources/src | |
| parent | 5c8cecaf5b8d24ffcd99ce45b922c5a853bd492d (diff) | |
simulator: Add generic approach for reporting resource events
This change introduces a generic approach for reporting resource events
to resource consumers. This way we reduce the boilerplate of the
SimResourceConsumer interface.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src')
16 files changed, 239 insertions, 153 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index f4459c54..1bcaf45f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -96,10 +96,8 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : override fun onIdle(deadline: Long) = doIdle(deadline) - override fun onFinish(cause: Throwable?) { - doFinish(cause) - - super.onFinish(cause) + override fun onFinish() { + doFinish(null) } } @@ -134,6 +132,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : */ private var command: SimResourceCommand? = null + private fun updateCapacity() { + // Adjust capacity of output resource + context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 } + } + /* Input */ override fun push(command: SimResourceCommand) { this.command = command @@ -141,18 +144,6 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : } /* SimResourceConsumer */ - override fun onStart(ctx: SimResourceContext) { - _ctx = ctx - onCapacityChanged(ctx, false) - - // Make sure we initialize the output if we have not done so yet - if (context.state == SimResourceState.Pending) { - context.start() - } - - onInputStarted(this) - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { var next = command @@ -167,13 +158,23 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : } } - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - // Adjust capacity of output resource - context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 } - } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + _ctx = ctx + updateCapacity() + + // Make sure we initialize the output if we have not done so yet + if (context.state == SimResourceState.Pending) { + context.start() + } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - onInputFinished(this) + onInputStarted(this) + } + SimResourceEvent.Capacity -> updateCapacity() + SimResourceEvent.Exit -> onInputFinished(this) + else -> {} + } } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 05ed0714..d2f585b1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -75,7 +75,7 @@ public abstract class SimAbstractResourceContext( /** * The current processing speed of the resource. */ - public var speed: Double = 0.0 + final override var speed: Double = 0.0 private set /** @@ -92,9 +92,7 @@ public abstract class SimAbstractResourceContext( /** * This method is invoked when the resource consumer has finished. */ - public open fun onFinish(cause: Throwable?) { - consumer.onFinish(this, cause) - } + public abstract fun onFinish() /** * Get the remaining work to process after a resource consumption. @@ -126,10 +124,10 @@ public abstract class SimAbstractResourceContext( latestFlush = now try { - consumer.onStart(this) + consumer.onEvent(this, SimResourceEvent.Start) activeCommand = interpret(consumer.onNext(this), now) } catch (cause: Throwable) { - doStop(cause) + doFail(cause) } finally { isProcessing = false } @@ -144,9 +142,9 @@ public abstract class SimAbstractResourceContext( latestFlush = clock.millis() flush(isIntermediate = true) - doStop(null) + doStop() } catch (cause: Throwable) { - doStop(cause) + doFail(cause) } finally { isProcessing = false } @@ -214,7 +212,7 @@ public abstract class SimAbstractResourceContext( // Flush remaining work cache _remainingWorkFlush = Long.MIN_VALUE } catch (cause: Throwable) { - doStop(cause) + doFail(cause) } finally { latestFlush = now isProcessing = false @@ -251,13 +249,18 @@ public abstract class SimAbstractResourceContext( /** * Finish the consumer and resource provider. */ - private fun doStop(cause: Throwable?) { + private fun doStop() { val state = state this.state = SimResourceState.Stopped if (state == SimResourceState.Active) { activeCommand = null - onFinish(cause) + try { + consumer.onEvent(this, SimResourceEvent.Exit) + onFinish() + } catch (cause: Throwable) { + doFail(cause) + } } } @@ -272,9 +275,9 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = 0.0 - consumer.onConfirm(this, 0.0) onIdle(deadline) + consumer.onEvent(this, SimResourceEvent.Run) } is SimResourceCommand.Consume -> { val work = command.work @@ -284,14 +287,13 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = min(capacity, limit) - consumer.onConfirm(this, speed) - onConsume(work, limit, deadline) + consumer.onEvent(this, SimResourceEvent.Run) } is SimResourceCommand.Exit -> { speed = 0.0 - doStop(null) + doStop() // No need to set the next active command return null @@ -319,6 +321,23 @@ public abstract class SimAbstractResourceContext( } /** + * Fail the resource consumer. + */ + private fun doFail(cause: Throwable) { + state = SimResourceState.Stopped + activeCommand = null + + try { + consumer.onFailure(this, cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + e.printStackTrace() + } + + onFinish() + } + + /** * Indicate that the capacity of the resource has changed. */ private fun onCapacityChange() { @@ -328,7 +347,8 @@ public abstract class SimAbstractResourceContext( } val isThrottled = speed > capacity - consumer.onCapacityChanged(this, isThrottled) + + consumer.onEvent(this, SimResourceEvent.Capacity) // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 38672b13..4d937514 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -30,13 +30,6 @@ package org.opendc.simulator.resources */ public interface SimResourceConsumer { /** - * This method is invoked when the consumer is started for some resource. - * - * @param ctx The execution context in which the consumer runs. - */ - public fun onStart(ctx: SimResourceContext) {} - - /** * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because * the resource finished processing, reached its deadline or was interrupted. * @@ -46,34 +39,18 @@ public interface SimResourceConsumer { public fun onNext(ctx: SimResourceContext): SimResourceCommand /** - * This method is invoked when the resource provider confirms that the consumer is running at the given speed. + * This method is invoked when an event has occurred. * * @param ctx The execution context in which the consumer runs. - * @param speed The speed at which the consumer runs. + * @param event The event that has occurred. */ - public fun onConfirm(ctx: SimResourceContext, speed: Double) {} + public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {} /** - * This is method is invoked when the capacity of the resource changes. - * - * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the - * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly - * causing the active resource command to finish at a later moment than initially planned. + * This method is invoked when a resource consumer throws an exception. * * @param ctx The execution context in which the consumer runs. - * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the - * capacity change. - */ - public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} - - /** - * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], - * the resource finished itself, or a failure occurred at the resource. - * - * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider. - * - * @param ctx The execution context in which the consumer ran. - * @param cause The cause of the finish in case the resource finished exceptionally. + * @param cause The cause of the failure. */ - public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} + public fun onFailure(ctx: SimResourceContext, cause: Throwable) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 11dbb09f..7c76c634 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -40,6 +40,11 @@ public interface SimResourceContext { public val capacity: Double /** + * The resource processing speed at this instant. + */ + public val speed: Double + + /** * The amount of work still remaining at this instant. */ public val remainingWork: Double diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index dfdd2c2e..8128c98b 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -90,26 +90,28 @@ public class SimResourceDistributorMaxMin( val remainingWork: Double get() = ctx.remainingWork - override fun onStart(ctx: SimResourceContext) { - this.ctx = ctx - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { return doNext(ctx.capacity) } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - super.onFinish(ctx, cause) - - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + } + SimResourceEvent.Exit -> { + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() - output.close() + output.close() + } + } + else -> {} } } } @@ -370,13 +372,11 @@ public class SimResourceDistributorMaxMin( activeCommand = SimResourceCommand.Consume(work, limit, deadline) } - override fun onFinish(cause: Throwable?) { + override fun onFinish() { reportOvercommit() activeCommand = SimResourceCommand.Exit provider.cancel() - - super.onFinish(cause) } override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt new file mode 100644 index 00000000..959427f1 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A resource event that is communicated to the resource consumer. + */ +public enum class SimResourceEvent { + /** + * This event is emitted to the consumer when it has started. + */ + Start, + + /** + * This event is emitted to the consumer when it has exited. + */ + Exit, + + /** + * This event is emitted to the consumer when it has started a new resource consumption or idle cycle. + */ + Run, + + /** + * This event is emitted to the consumer when the capacity of the resource has changed. + */ + Capacity, +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 52b13c5c..2f567a5e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -23,6 +23,8 @@ package org.opendc.simulator.resources import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException /** * A [SimResourceProvider] provides some resource of type [R]. @@ -65,15 +67,27 @@ public interface SimResourceProvider : AutoCloseable { public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { return suspendCancellableCoroutine { cont -> startConsumer(object : SimResourceConsumer by consumer { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - assert(!cont.isCompleted) { "Coroutine already completed" } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + consumer.onEvent(ctx, event) - consumer.onFinish(ctx, cause) + if (event == SimResourceEvent.Exit && !cont.isCompleted) { + cont.resume(Unit) + } + } - cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + try { + consumer.onFailure(ctx, cause) + cont.resumeWithException(cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + cont.resumeWithException(e) + } } override fun toString(): String = "SimSuspendingResourceConsumer" }) + + cont.invokeOnCancellation { cancel() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 157db3cb..fe569096 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -110,7 +110,7 @@ public class SimResourceSource( scheduler.startSingleTimerTo(this, until, ::flush) } - override fun onFinish(cause: Throwable?) { + override fun onFinish() { scheduler.cancel(this) ctx = null @@ -118,8 +118,6 @@ public class SimResourceSource( if (this@SimResourceSource.state != SimResourceState.Stopped) { this@SimResourceSource.state = SimResourceState.Pending } - - super.onFinish(cause) } override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 45e4c220..1a9dd0bc 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -66,10 +66,13 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { availableResources += forwarder input.startConsumer(object : SimResourceConsumer by forwarder { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - // De-register the input after it has finished - _inputs -= input - forwarder.onFinish(ctx, cause) + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + if (event == SimResourceEvent.Exit) { + // De-register the input after it has finished + _inputs -= input + } + + forwarder.onEvent(ctx, event) } }) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index de455021..32f3f573 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -75,7 +75,7 @@ public class SimResourceTransformer( if (delegate != null && ctx != null) { this.delegate = null - delegate.onFinish(ctx) + delegate.onEvent(ctx, SimResourceEvent.Exit) } } @@ -90,10 +90,6 @@ public class SimResourceTransformer( } } - override fun onStart(ctx: SimResourceContext) { - this.ctx = ctx - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { val delegate = delegate @@ -110,7 +106,7 @@ public class SimResourceTransformer( // reset beforehand the existing state and check whether it has been updated afterwards reset() - delegate.onFinish(ctx) + delegate.onEvent(ctx, SimResourceEvent.Exit) if (isCoupled || state == SimResourceState.Stopped) SimResourceCommand.Exit @@ -124,21 +120,31 @@ public class SimResourceTransformer( } } - override fun onConfirm(ctx: SimResourceContext, speed: Double) { - delegate?.onConfirm(ctx, speed) - } - - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - delegate?.onCapacityChanged(ctx, isThrottled) + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + } + SimResourceEvent.Exit -> { + this.ctx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onEvent(ctx, SimResourceEvent.Exit) + } + } + else -> delegate?.onEvent(ctx, event) + } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { this.ctx = null val delegate = delegate if (delegate != null) { reset() - delegate.onFinish(ctx, cause) + delegate.onFailure(ctx, cause) } } @@ -147,7 +153,7 @@ public class SimResourceTransformer( */ private fun start() { val delegate = delegate ?: return - delegate.onStart(checkNotNull(ctx)) + delegate.onEvent(checkNotNull(ctx), SimResourceEvent.Start) hasDelegateStarted = true } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index 114c7312..4f4ebb14 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.SimResourceEvent import kotlin.math.min /** @@ -53,28 +54,29 @@ public class SimSpeedConsumerAdapter( return delegate.onNext(ctx) } - override fun onConfirm(ctx: SimResourceContext, speed: Double) { - delegate.onConfirm(ctx, speed) - - this.speed = speed - } - - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { val oldSpeed = speed - delegate.onCapacityChanged(ctx, isThrottled) + delegate.onEvent(ctx, event) - // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might - // need to update the current speed. - if (oldSpeed == speed) { - speed = min(ctx.capacity, speed) + when (event) { + SimResourceEvent.Run -> speed = ctx.speed + SimResourceEvent.Capacity -> { + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == speed) { + speed = min(ctx.capacity, speed) + } + } + SimResourceEvent.Exit -> speed = 0.0 + else -> {} } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - super.onFinish(ctx, cause) - + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { speed = 0.0 + + delegate.onFailure(ctx, cause) } override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index a52d1d5d..2e94e1c1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.SimResourceEvent /** * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -33,11 +34,6 @@ import org.opendc.simulator.resources.SimResourceContext public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { private var iterator: Iterator<Fragment>? = null - override fun onStart(ctx: SimResourceContext) { - check(iterator == null) { "Consumer already running" } - iterator = trace.iterator() - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { @@ -57,8 +53,17 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - iterator = null + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + check(iterator == null) { "Consumer already running" } + iterator = trace.iterator() + } + SimResourceEvent.Exit -> { + iterator = null + } + else -> {} + } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index be909556..8c15ec71 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -40,7 +40,7 @@ class SimResourceContextTest { val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} } context.flush() @@ -53,7 +53,7 @@ class SimResourceContextTest { val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} }) @@ -71,7 +71,7 @@ class SimResourceContextTest { val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} }) @@ -83,7 +83,7 @@ class SimResourceContextTest { assertAll( { verify(exactly = 2) { context.onIdle(any()) } }, - { verify(exactly = 1) { context.onFinish(null) } } + { verify(exactly = 1) { context.onFinish() } } ) } @@ -94,7 +94,7 @@ class SimResourceContextTest { val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 39f74481..361a1516 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -77,7 +77,7 @@ class SimResourceSourceTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } finally { scheduler.close() provider.close() @@ -119,13 +119,13 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, clock, scheduler) val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext) { - ctx.interrupt() - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + ctx.interrupt() + } } try { @@ -145,8 +145,12 @@ class SimResourceSourceTest { val consumer = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext) { - resCtx = ctx + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> resCtx = ctx + else -> {} + } } override fun onNext(ctx: SimResourceContext): SimResourceCommand { @@ -181,7 +185,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, clock, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) - every { consumer.onStart(any()) } + every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } .throws(IllegalStateException()) try { diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index f7d17867..1b1f7790 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -120,8 +120,11 @@ internal class SimResourceSwitchExclusiveTest { val workload = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext) { - isFirst = true + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> isFirst = true + else -> {} + } } override fun onNext(ctx: SimResourceContext): SimResourceCommand { diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index d2ad73bc..e3ca5845 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -118,7 +118,7 @@ internal class SimResourceTransformerTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onFinish(any(), null) } + verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) } } @Test @@ -136,8 +136,8 @@ internal class SimResourceTransformerTest { yield() forwarder.cancel() - verify(exactly = 1) { consumer.onStart(any()) } - verify(exactly = 1) { consumer.onFinish(any(), null) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } } @Test @@ -155,8 +155,8 @@ internal class SimResourceTransformerTest { yield() source.cancel() - verify(exactly = 1) { consumer.onStart(any()) } - verify(exactly = 1) { consumer.onFinish(any(), null) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } } @Test @@ -191,7 +191,7 @@ internal class SimResourceTransformerTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } @Test |
