From 5c8cecaf5b8d24ffcd99ce45b922c5a853bd492d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 21 Apr 2021 11:01:07 +0200 Subject: simulator: Simplify scheduling logic of resource aggregator This change simplifies the scheduling logic of the resource aggregator. Previously, after each scheduling cycle, each aggregated input was interrupted. With the new approach, the scheduler can decide which ones of the inputs to send a new command to. --- .../resources/SimAbstractResourceAggregator.kt | 147 +++++++++------------ .../resources/SimResourceAggregatorMaxMin.kt | 38 ++++-- .../simulator/resources/SimResourceSource.kt | 7 +- .../resources/SimResourceAggregatorMaxMinTest.kt | 2 +- 4 files changed, 90 insertions(+), 104 deletions(-) (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index c7fa6a17..f4459c54 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -28,24 +28,6 @@ import java.time.Clock * Abstract implementation of [SimResourceAggregator]. */ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { - /** - * The available resource provider contexts. - */ - protected val inputContexts: Set - get() = _inputContexts - private val _inputContexts = mutableSetOf() - - /** - * The output context. - */ - protected val outputContext: SimResourceContext - get() = context - - /** - * The commands to submit to the underlying input resources. - */ - protected val commands: MutableMap = mutableMapOf() - /** * This method is invoked when the resource consumer consumes resources. */ @@ -54,37 +36,29 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : /** * This method is invoked when the resource consumer enters an idle state. */ - protected open fun doIdle(deadline: Long) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Idle(deadline) - } - } + protected abstract fun doIdle(deadline: Long) /** * This method is invoked when the resource consumer finishes processing. */ - protected open fun doFinish(cause: Throwable?) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Exit - } - } + protected abstract fun doFinish(cause: Throwable?) /** * This method is invoked when an input context is started. */ - protected open fun onContextStarted(ctx: SimResourceContext) { - _inputContexts.add(ctx) - } + protected abstract fun onInputStarted(input: Input) - protected open fun onContextFinished(ctx: SimResourceContext) { - assert(_inputContexts.remove(ctx)) { "Lost context" } - } + /** + * This method is invoked when an input is stopped. + */ + protected abstract fun onInputFinished(input: Input) override fun addInput(input: SimResourceProvider) { check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } val consumer = Consumer() _inputs.add(input) + _inputConsumers.add(consumer) input.startConsumer(consumer) } @@ -99,15 +73,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : override val inputs: Set get() = _inputs private val _inputs = mutableSetOf() + private val _inputConsumers = mutableListOf() - private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { + protected val outputContext: SimResourceContext + get() = context + private val context = object : SimAbstractResourceContext(0.0, clock, _output) { override val remainingWork: Double get() { val now = clock.millis() return if (_remainingWorkFlush < now) { _remainingWorkFlush = now - _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } + _inputConsumers.sumByDouble { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it } } else { _remainingWork } @@ -115,12 +92,6 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private var _remainingWork: Double = 0.0 private var _remainingWorkFlush: Long = Long.MIN_VALUE - override fun interrupt() { - super.interrupt() - - interruptAll() - } - override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) override fun onIdle(deadline: Long) = doIdle(deadline) @@ -129,80 +100,80 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : doFinish(cause) super.onFinish(cause) - - interruptAll() } } /** - * A flag to indicate that an interrupt is active. - */ - private var isInterrupting: Boolean = false - - /** - * Schedule the work over the input resources. + * An input for the resource aggregator. */ - private fun doSchedule() { - context.flush(isIntermediate = true) - interruptAll() + public interface Input { + /** + * The [SimResourceContext] associated with the input. + */ + public val ctx: SimResourceContext + + /** + * Push the specified [SimResourceCommand] to the input. + */ + public fun push(command: SimResourceCommand) } /** - * Interrupt all inputs. + * An internal [SimResourceConsumer] implementation for aggregator inputs. */ - private fun interruptAll() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isInterrupting) { - return + private inner class Consumer : Input, SimResourceConsumer { + /** + * The resource context associated with the input. + */ + override val ctx: SimResourceContext + get() = _ctx!! + var _ctx: SimResourceContext? = null + + /** + * The resource command to run next. + */ + private var command: SimResourceCommand? = null + + /* Input */ + override fun push(command: SimResourceCommand) { + this.command = command + _ctx?.interrupt() } - try { - isInterrupting = true - - val iterator = _inputs.iterator() - while (iterator.hasNext()) { - val input = iterator.next() - input.interrupt() - - if (input.state != SimResourceState.Active) { - iterator.remove() - } - } - } finally { - isInterrupting = false - } - } - - /** - * An internal [SimResourceConsumer] implementation for aggregator inputs. - */ - private inner class Consumer : SimResourceConsumer { + /* SimResourceConsumer */ override fun onStart(ctx: SimResourceContext) { - onContextStarted(ctx) + _ctx = ctx onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { context.start() } + + onInputStarted(this) } override fun onNext(ctx: SimResourceContext): SimResourceCommand { - doSchedule() - - return commands[ctx] ?: SimResourceCommand.Idle() + var next = command + + return if (next != null) { + this.command = null + next + } else { + context.flush(isIntermediate = true) + next = command + this.command = null + next ?: SimResourceCommand.Idle() + } } override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { // Adjust capacity of output resource - context.capacity = inputContexts.sumByDouble { it.capacity } + context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 } } override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - onContextFinished(ctx) - - super.onFinish(ctx, cause) + onInputFinished(this) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 08bc064e..5d550ad8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -28,36 +28,46 @@ import java.time.Clock * A [SimResourceAggregator] that distributes the load equally across the input resources. */ public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { - private val consumers = mutableListOf() + private val consumers = mutableListOf() override fun doConsume(work: Double, limit: Double, deadline: Long) { // Sort all consumers by their capacity - consumers.sortWith(compareBy { it.capacity }) + consumers.sortWith(compareBy { it.ctx.capacity }) // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { - val inputCapacity = input.capacity + val inputCapacity = input.ctx.capacity val fraction = inputCapacity / outputContext.capacity val grantedSpeed = limit * fraction val grantedWork = fraction * work - commands[input] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) + val command = if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + input.push(command) } } - override fun onContextStarted(ctx: SimResourceContext) { - super.onContextStarted(ctx) + override fun doIdle(deadline: Long) { + for (input in consumers) { + input.push(SimResourceCommand.Idle(deadline)) + } + } - consumers.add(ctx) + override fun doFinish(cause: Throwable?) { + val iterator = consumers.iterator() + for (input in iterator) { + iterator.remove() + input.push(SimResourceCommand.Exit) + } } - override fun onContextFinished(ctx: SimResourceContext) { - super.onContextFinished(ctx) + override fun onInputStarted(input: Input) { + consumers.add(input) + } - consumers.remove(ctx) + override fun onInputFinished(input: Input) { + consumers.remove(input) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 025b0406..157db3cb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -112,7 +112,12 @@ public class SimResourceSource( override fun onFinish(cause: Throwable?) { scheduler.cancel(this) - cancel() + + ctx = null + + if (this@SimResourceSource.state != SimResourceState.Stopped) { + this@SimResourceSource.state = SimResourceState.Pending + } super.onFinish(cause) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index e272abb8..e78bcdac 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -139,7 +139,7 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) + .andThenThrows(IllegalStateException("Test Exception")) try { assertThrows { aggregator.output.consume(consumer) } -- cgit v1.2.3 From 980b016452b3889585feaf2dbbe3244c921123b0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 22 Apr 2021 17:18:59 +0200 Subject: simulator: Add generic approach for reporting resource events This change introduces a generic approach for reporting resource events to resource consumers. This way we reduce the boilerplate of the SimResourceConsumer interface. --- .../resources/SimAbstractResourceAggregator.kt | 45 ++++++++++--------- .../resources/SimAbstractResourceContext.kt | 52 +++++++++++++++------- .../simulator/resources/SimResourceConsumer.kt | 35 +++------------ .../simulator/resources/SimResourceContext.kt | 5 +++ .../resources/SimResourceDistributorMaxMin.kt | 34 +++++++------- .../opendc/simulator/resources/SimResourceEvent.kt | 48 ++++++++++++++++++++ .../simulator/resources/SimResourceProvider.kt | 22 +++++++-- .../simulator/resources/SimResourceSource.kt | 4 +- .../resources/SimResourceSwitchExclusive.kt | 11 +++-- .../simulator/resources/SimResourceTransformer.kt | 36 ++++++++------- .../resources/consumer/SimSpeedConsumerAdapter.kt | 32 ++++++------- .../resources/consumer/SimTraceConsumer.kt | 19 +++++--- .../simulator/resources/SimResourceContextTest.kt | 10 ++--- .../simulator/resources/SimResourceSourceTest.kt | 20 +++++---- .../resources/SimResourceSwitchExclusiveTest.kt | 7 ++- .../resources/SimResourceTransformerTest.kt | 12 ++--- 16 files changed, 239 insertions(+), 153 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index f4459c54..1bcaf45f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -96,10 +96,8 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : override fun onIdle(deadline: Long) = doIdle(deadline) - override fun onFinish(cause: Throwable?) { - doFinish(cause) - - super.onFinish(cause) + override fun onFinish() { + doFinish(null) } } @@ -134,6 +132,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : */ private var command: SimResourceCommand? = null + private fun updateCapacity() { + // Adjust capacity of output resource + context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 } + } + /* Input */ override fun push(command: SimResourceCommand) { this.command = command @@ -141,18 +144,6 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : } /* SimResourceConsumer */ - override fun onStart(ctx: SimResourceContext) { - _ctx = ctx - onCapacityChanged(ctx, false) - - // Make sure we initialize the output if we have not done so yet - if (context.state == SimResourceState.Pending) { - context.start() - } - - onInputStarted(this) - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { var next = command @@ -167,13 +158,23 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : } } - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - // Adjust capacity of output resource - context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 } - } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + _ctx = ctx + updateCapacity() + + // Make sure we initialize the output if we have not done so yet + if (context.state == SimResourceState.Pending) { + context.start() + } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - onInputFinished(this) + onInputStarted(this) + } + SimResourceEvent.Capacity -> updateCapacity() + SimResourceEvent.Exit -> onInputFinished(this) + else -> {} + } } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 05ed0714..d2f585b1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -75,7 +75,7 @@ public abstract class SimAbstractResourceContext( /** * The current processing speed of the resource. */ - public var speed: Double = 0.0 + final override var speed: Double = 0.0 private set /** @@ -92,9 +92,7 @@ public abstract class SimAbstractResourceContext( /** * This method is invoked when the resource consumer has finished. */ - public open fun onFinish(cause: Throwable?) { - consumer.onFinish(this, cause) - } + public abstract fun onFinish() /** * Get the remaining work to process after a resource consumption. @@ -126,10 +124,10 @@ public abstract class SimAbstractResourceContext( latestFlush = now try { - consumer.onStart(this) + consumer.onEvent(this, SimResourceEvent.Start) activeCommand = interpret(consumer.onNext(this), now) } catch (cause: Throwable) { - doStop(cause) + doFail(cause) } finally { isProcessing = false } @@ -144,9 +142,9 @@ public abstract class SimAbstractResourceContext( latestFlush = clock.millis() flush(isIntermediate = true) - doStop(null) + doStop() } catch (cause: Throwable) { - doStop(cause) + doFail(cause) } finally { isProcessing = false } @@ -214,7 +212,7 @@ public abstract class SimAbstractResourceContext( // Flush remaining work cache _remainingWorkFlush = Long.MIN_VALUE } catch (cause: Throwable) { - doStop(cause) + doFail(cause) } finally { latestFlush = now isProcessing = false @@ -251,13 +249,18 @@ public abstract class SimAbstractResourceContext( /** * Finish the consumer and resource provider. */ - private fun doStop(cause: Throwable?) { + private fun doStop() { val state = state this.state = SimResourceState.Stopped if (state == SimResourceState.Active) { activeCommand = null - onFinish(cause) + try { + consumer.onEvent(this, SimResourceEvent.Exit) + onFinish() + } catch (cause: Throwable) { + doFail(cause) + } } } @@ -272,9 +275,9 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = 0.0 - consumer.onConfirm(this, 0.0) onIdle(deadline) + consumer.onEvent(this, SimResourceEvent.Run) } is SimResourceCommand.Consume -> { val work = command.work @@ -284,14 +287,13 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } speed = min(capacity, limit) - consumer.onConfirm(this, speed) - onConsume(work, limit, deadline) + consumer.onEvent(this, SimResourceEvent.Run) } is SimResourceCommand.Exit -> { speed = 0.0 - doStop(null) + doStop() // No need to set the next active command return null @@ -318,6 +320,23 @@ public abstract class SimAbstractResourceContext( } } + /** + * Fail the resource consumer. + */ + private fun doFail(cause: Throwable) { + state = SimResourceState.Stopped + activeCommand = null + + try { + consumer.onFailure(this, cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + e.printStackTrace() + } + + onFinish() + } + /** * Indicate that the capacity of the resource has changed. */ @@ -328,7 +347,8 @@ public abstract class SimAbstractResourceContext( } val isThrottled = speed > capacity - consumer.onCapacityChanged(this, isThrottled) + + consumer.onEvent(this, SimResourceEvent.Capacity) // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 38672b13..4d937514 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -29,13 +29,6 @@ package org.opendc.simulator.resources * for multiple resource providers, unless explicitly said otherwise. */ public interface SimResourceConsumer { - /** - * This method is invoked when the consumer is started for some resource. - * - * @param ctx The execution context in which the consumer runs. - */ - public fun onStart(ctx: SimResourceContext) {} - /** * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because * the resource finished processing, reached its deadline or was interrupted. @@ -46,34 +39,18 @@ public interface SimResourceConsumer { public fun onNext(ctx: SimResourceContext): SimResourceCommand /** - * This method is invoked when the resource provider confirms that the consumer is running at the given speed. + * This method is invoked when an event has occurred. * * @param ctx The execution context in which the consumer runs. - * @param speed The speed at which the consumer runs. + * @param event The event that has occurred. */ - public fun onConfirm(ctx: SimResourceContext, speed: Double) {} + public fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {} /** - * This is method is invoked when the capacity of the resource changes. - * - * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the - * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly - * causing the active resource command to finish at a later moment than initially planned. + * This method is invoked when a resource consumer throws an exception. * * @param ctx The execution context in which the consumer runs. - * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the - * capacity change. - */ - public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} - - /** - * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], - * the resource finished itself, or a failure occurred at the resource. - * - * Note that throwing an exception in [onStart] or [onNext] is undefined behavior and up to the resource provider. - * - * @param ctx The execution context in which the consumer ran. - * @param cause The cause of the finish in case the resource finished exceptionally. + * @param cause The cause of the failure. */ - public fun onFinish(ctx: SimResourceContext, cause: Throwable? = null) {} + public fun onFailure(ctx: SimResourceContext, cause: Throwable) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 11dbb09f..7c76c634 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -39,6 +39,11 @@ public interface SimResourceContext { */ public val capacity: Double + /** + * The resource processing speed at this instant. + */ + public val speed: Double + /** * The amount of work still remaining at this instant. */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index dfdd2c2e..8128c98b 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -90,26 +90,28 @@ public class SimResourceDistributorMaxMin( val remainingWork: Double get() = ctx.remainingWork - override fun onStart(ctx: SimResourceContext) { - this.ctx = ctx - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { return doNext(ctx.capacity) } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - super.onFinish(ctx, cause) - - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + } + SimResourceEvent.Exit -> { + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() - output.close() + output.close() + } + } + else -> {} } } } @@ -370,13 +372,11 @@ public class SimResourceDistributorMaxMin( activeCommand = SimResourceCommand.Consume(work, limit, deadline) } - override fun onFinish(cause: Throwable?) { + override fun onFinish() { reportOvercommit() activeCommand = SimResourceCommand.Exit provider.cancel() - - super.onFinish(cause) } override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt new file mode 100644 index 00000000..959427f1 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceEvent.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A resource event that is communicated to the resource consumer. + */ +public enum class SimResourceEvent { + /** + * This event is emitted to the consumer when it has started. + */ + Start, + + /** + * This event is emitted to the consumer when it has exited. + */ + Exit, + + /** + * This event is emitted to the consumer when it has started a new resource consumption or idle cycle. + */ + Run, + + /** + * This event is emitted to the consumer when the capacity of the resource has changed. + */ + Capacity, +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 52b13c5c..2f567a5e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -23,6 +23,8 @@ package org.opendc.simulator.resources import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException /** * A [SimResourceProvider] provides some resource of type [R]. @@ -65,15 +67,27 @@ public interface SimResourceProvider : AutoCloseable { public suspend fun SimResourceProvider.consume(consumer: SimResourceConsumer) { return suspendCancellableCoroutine { cont -> startConsumer(object : SimResourceConsumer by consumer { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - assert(!cont.isCompleted) { "Coroutine already completed" } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + consumer.onEvent(ctx, event) - consumer.onFinish(ctx, cause) + if (event == SimResourceEvent.Exit && !cont.isCompleted) { + cont.resume(Unit) + } + } - cont.resumeWith(if (cause != null) Result.failure(cause) else Result.success(Unit)) + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { + try { + consumer.onFailure(ctx, cause) + cont.resumeWithException(cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + cont.resumeWithException(e) + } } override fun toString(): String = "SimSuspendingResourceConsumer" }) + + cont.invokeOnCancellation { cancel() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 157db3cb..fe569096 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -110,7 +110,7 @@ public class SimResourceSource( scheduler.startSingleTimerTo(this, until, ::flush) } - override fun onFinish(cause: Throwable?) { + override fun onFinish() { scheduler.cancel(this) ctx = null @@ -118,8 +118,6 @@ public class SimResourceSource( if (this@SimResourceSource.state != SimResourceState.Stopped) { this@SimResourceSource.state = SimResourceState.Pending } - - super.onFinish(cause) } override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 45e4c220..1a9dd0bc 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -66,10 +66,13 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { availableResources += forwarder input.startConsumer(object : SimResourceConsumer by forwarder { - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - // De-register the input after it has finished - _inputs -= input - forwarder.onFinish(ctx, cause) + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + if (event == SimResourceEvent.Exit) { + // De-register the input after it has finished + _inputs -= input + } + + forwarder.onEvent(ctx, event) } }) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index de455021..32f3f573 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -75,7 +75,7 @@ public class SimResourceTransformer( if (delegate != null && ctx != null) { this.delegate = null - delegate.onFinish(ctx) + delegate.onEvent(ctx, SimResourceEvent.Exit) } } @@ -90,10 +90,6 @@ public class SimResourceTransformer( } } - override fun onStart(ctx: SimResourceContext) { - this.ctx = ctx - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { val delegate = delegate @@ -110,7 +106,7 @@ public class SimResourceTransformer( // reset beforehand the existing state and check whether it has been updated afterwards reset() - delegate.onFinish(ctx) + delegate.onEvent(ctx, SimResourceEvent.Exit) if (isCoupled || state == SimResourceState.Stopped) SimResourceCommand.Exit @@ -124,21 +120,31 @@ public class SimResourceTransformer( } } - override fun onConfirm(ctx: SimResourceContext, speed: Double) { - delegate?.onConfirm(ctx, speed) - } - - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { - delegate?.onCapacityChanged(ctx, isThrottled) + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + } + SimResourceEvent.Exit -> { + this.ctx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onEvent(ctx, SimResourceEvent.Exit) + } + } + else -> delegate?.onEvent(ctx, event) + } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { this.ctx = null val delegate = delegate if (delegate != null) { reset() - delegate.onFinish(ctx, cause) + delegate.onFailure(ctx, cause) } } @@ -147,7 +153,7 @@ public class SimResourceTransformer( */ private fun start() { val delegate = delegate ?: return - delegate.onStart(checkNotNull(ctx)) + delegate.onEvent(checkNotNull(ctx), SimResourceEvent.Start) hasDelegateStarted = true } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index 114c7312..4f4ebb14 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.SimResourceEvent import kotlin.math.min /** @@ -53,28 +54,29 @@ public class SimSpeedConsumerAdapter( return delegate.onNext(ctx) } - override fun onConfirm(ctx: SimResourceContext, speed: Double) { - delegate.onConfirm(ctx, speed) - - this.speed = speed - } - - override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { val oldSpeed = speed - delegate.onCapacityChanged(ctx, isThrottled) + delegate.onEvent(ctx, event) - // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might - // need to update the current speed. - if (oldSpeed == speed) { - speed = min(ctx.capacity, speed) + when (event) { + SimResourceEvent.Run -> speed = ctx.speed + SimResourceEvent.Capacity -> { + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == speed) { + speed = min(ctx.capacity, speed) + } + } + SimResourceEvent.Exit -> speed = 0.0 + else -> {} } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - super.onFinish(ctx, cause) - + override fun onFailure(ctx: SimResourceContext, cause: Throwable) { speed = 0.0 + + delegate.onFailure(ctx, cause) } override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]" diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index a52d1d5d..2e94e1c1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import org.opendc.simulator.resources.SimResourceEvent /** * A [SimResourceConsumer] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -33,11 +34,6 @@ import org.opendc.simulator.resources.SimResourceContext public class SimTraceConsumer(private val trace: Sequence) : SimResourceConsumer { private var iterator: Iterator? = null - override fun onStart(ctx: SimResourceContext) { - check(iterator == null) { "Consumer already running" } - iterator = trace.iterator() - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { @@ -57,8 +53,17 @@ public class SimTraceConsumer(private val trace: Sequence) : SimResour } } - override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - iterator = null + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + check(iterator == null) { "Consumer already running" } + iterator = trace.iterator() + } + SimResourceEvent.Exit -> { + iterator = null + } + else -> {} + } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index be909556..8c15ec71 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -40,7 +40,7 @@ class SimResourceContextTest { val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} } context.flush() @@ -53,7 +53,7 @@ class SimResourceContextTest { val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} }) @@ -71,7 +71,7 @@ class SimResourceContextTest { val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} }) @@ -83,7 +83,7 @@ class SimResourceContextTest { assertAll( { verify(exactly = 2) { context.onIdle(any()) } }, - { verify(exactly = 1) { context.onFinish(null) } } + { verify(exactly = 1) { context.onFinish() } } ) } @@ -94,7 +94,7 @@ class SimResourceContextTest { val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { override fun onIdle(deadline: Long) {} - override fun onFinish(cause: Throwable?) {} + override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 39f74481..361a1516 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -77,7 +77,7 @@ class SimResourceSourceTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } finally { scheduler.close() provider.close() @@ -119,13 +119,13 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, clock, scheduler) val consumer = object : SimResourceConsumer { - override fun onStart(ctx: SimResourceContext) { - ctx.interrupt() - } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { return SimResourceCommand.Exit } + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + ctx.interrupt() + } } try { @@ -145,8 +145,12 @@ class SimResourceSourceTest { val consumer = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext) { - resCtx = ctx + + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> resCtx = ctx + else -> {} + } } override fun onNext(ctx: SimResourceContext): SimResourceCommand { @@ -181,7 +185,7 @@ class SimResourceSourceTest { val provider = SimResourceSource(capacity, clock, scheduler) val consumer = mockk(relaxUnitFun = true) - every { consumer.onStart(any()) } + every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } .throws(IllegalStateException()) try { diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index f7d17867..1b1f7790 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -120,8 +120,11 @@ internal class SimResourceSwitchExclusiveTest { val workload = object : SimResourceConsumer { var isFirst = true - override fun onStart(ctx: SimResourceContext) { - isFirst = true + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> isFirst = true + else -> {} + } } override fun onNext(ctx: SimResourceContext): SimResourceCommand { diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index d2ad73bc..e3ca5845 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -118,7 +118,7 @@ internal class SimResourceTransformerTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onFinish(any(), null) } + verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Exit) } } @Test @@ -136,8 +136,8 @@ internal class SimResourceTransformerTest { yield() forwarder.cancel() - verify(exactly = 1) { consumer.onStart(any()) } - verify(exactly = 1) { consumer.onFinish(any(), null) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } } @Test @@ -155,8 +155,8 @@ internal class SimResourceTransformerTest { yield() source.cancel() - verify(exactly = 1) { consumer.onStart(any()) } - verify(exactly = 1) { consumer.onFinish(any(), null) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Exit) } } @Test @@ -191,7 +191,7 @@ internal class SimResourceTransformerTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onCapacityChanged(any(), true) } + verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } @Test -- cgit v1.2.3 From b5d6aa7f384ea9d6a1a40965e883ac6403c302fd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 23 Apr 2021 16:41:19 +0200 Subject: simulator: Fix compute benchmarks This change fixes an issue with the compute benchmarks where the workload was being re-used across iterations. --- .../opendc/simulator/compute/BenchmarkHelpers.kt | 43 ---------------------- .../simulator/compute/SimMachineBenchmarks.kt | 27 +++++++++----- 2 files changed, 17 insertions(+), 53 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/BenchmarkHelpers.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/BenchmarkHelpers.kt deleted file mode 100644 index 43bbfd0b..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/BenchmarkHelpers.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute - -import org.opendc.simulator.compute.workload.SimTraceWorkload - -/** - * Helper function to create simple consumer workload. - */ -fun createSimpleConsumer(): SimTraceWorkload { - return SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(1000, 28.0, 1), - SimTraceWorkload.Fragment(1000, 3500.0, 1), - SimTraceWorkload.Fragment(1000, 0.0, 1), - SimTraceWorkload.Fragment(1000, 183.0, 1), - SimTraceWorkload.Fragment(1000, 400.0, 1), - SimTraceWorkload.Fragment(1000, 100.0, 1), - SimTraceWorkload.Fragment(1000, 3000.0, 1), - SimTraceWorkload.Fragment(1000, 4500.0, 1), - ), - ) -} diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index 7b97a665..bae31921 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -31,12 +31,11 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel -import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation import org.opendc.utils.TimerScheduler import org.openjdk.jmh.annotations.* -import java.time.Clock import java.util.concurrent.TimeUnit @State(Scope.Thread) @@ -46,14 +45,13 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var clock: Clock private lateinit var scheduler: TimerScheduler private lateinit var machineModel: SimMachineModel @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = TimerScheduler(scope.coroutineContext, clock) + scheduler = TimerScheduler(scope.coroutineContext, scope.clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -65,11 +63,20 @@ class SimMachineBenchmarks { @State(Scope.Thread) class Workload { - lateinit var workloads: Array + lateinit var trace: Sequence @Setup fun setUp() { - workloads = Array(2) { createSimpleConsumer() } + trace = sequenceOf( + SimTraceWorkload.Fragment(1000, 28.0, 1), + SimTraceWorkload.Fragment(1000, 3500.0, 1), + SimTraceWorkload.Fragment(1000, 0.0, 1), + SimTraceWorkload.Fragment(1000, 183.0, 1), + SimTraceWorkload.Fragment(1000, 400.0, 1), + SimTraceWorkload.Fragment(1000, 100.0, 1), + SimTraceWorkload.Fragment(1000, 3000.0, 1), + SimTraceWorkload.Fragment(1000, 4500.0, 1), + ) } } @@ -80,7 +87,7 @@ class SimMachineBenchmarks { coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - return@runBlockingSimulation machine.run(state.workloads[0]) + return@runBlockingSimulation machine.run(SimTraceWorkload(state.trace)) } } @@ -98,7 +105,7 @@ class SimMachineBenchmarks { val vm = hypervisor.createMachine(machineModel) try { - return@runBlockingSimulation vm.run(state.workloads[0]) + return@runBlockingSimulation vm.run(SimTraceWorkload(state.trace)) } finally { vm.close() machine.close() @@ -120,7 +127,7 @@ class SimMachineBenchmarks { val vm = hypervisor.createMachine(machineModel) try { - return@runBlockingSimulation vm.run(state.workloads[0]) + return@runBlockingSimulation vm.run(SimTraceWorkload(state.trace)) } finally { vm.close() machine.close() @@ -145,7 +152,7 @@ class SimMachineBenchmarks { launch { try { - vm.run(state.workloads[i]) + vm.run(SimTraceWorkload(state.trace)) } finally { machine.close() } -- cgit v1.2.3 From 80335a49513f3e74228aa1bfb998dd54855f68e2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 23 Apr 2021 17:15:25 +0200 Subject: simulator: Introduce SimResourceScheduler This change introduces the SimResourceScheduler interface, which is a generic interface for scheduling the coordination and synchronization between resource providers and resource consumers. This interface replaces the need for users to manually specify the clock and coroutine context per resource provider. --- .../simulator/compute/SimMachineBenchmarks.kt | 15 ++-- .../simulator/compute/SimBareMetalMachine.kt | 6 +- .../simulator/compute/SimFairShareHypervisor.kt | 4 +- .../compute/SimFairShareHypervisorProvider.kt | 8 +- .../simulator/compute/SimHypervisorProvider.kt | 5 +- .../compute/SimSpaceSharedHypervisorProvider.kt | 7 +- .../opendc/simulator/compute/SimHypervisorTest.kt | 5 +- .../opendc/simulator/resources/BenchmarkHelpers.kt | 43 ---------- .../simulator/resources/SimResourceBenchmarks.kt | 57 +++++++------ .../resources/SimAbstractResourceAggregator.kt | 6 +- .../resources/SimAbstractResourceContext.kt | 26 +++--- .../resources/SimResourceAggregatorMaxMin.kt | 4 +- .../resources/SimResourceDistributorMaxMin.kt | 21 +++-- .../simulator/resources/SimResourceFlushable.kt | 37 +++++++++ .../simulator/resources/SimResourceScheduler.kt | 69 ++++++++++++++++ .../resources/SimResourceSchedulerTrampoline.kt | 95 ++++++++++++++++++++++ .../simulator/resources/SimResourceSource.kt | 15 ++-- .../simulator/resources/SimResourceSwitchMaxMin.kt | 7 +- .../resources/SimResourceAggregatorMaxMinTest.kt | 49 ++++++----- .../simulator/resources/SimResourceContextTest.kt | 53 ++++++++++-- .../simulator/resources/SimResourceSourceTest.kt | 66 ++++++--------- .../resources/SimResourceSwitchExclusiveTest.kt | 17 ++-- .../resources/SimResourceSwitchMaxMinTest.kt | 22 ++--- .../resources/SimResourceTransformerTest.kt | 30 ++++--- .../simulator/resources/SimWorkConsumerTest.kt | 9 +- 25 files changed, 436 insertions(+), 240 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt create mode 100644 opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index bae31921..15714aca 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -34,7 +34,8 @@ import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.utils.TimerScheduler +import org.opendc.simulator.resources.SimResourceScheduler +import org.opendc.simulator.resources.SimResourceSchedulerTrampoline import org.openjdk.jmh.annotations.* import java.util.concurrent.TimeUnit @@ -45,13 +46,13 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: TimerScheduler + private lateinit var scheduler: SimResourceScheduler private lateinit var machineModel: SimMachineModel @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = TimerScheduler(scope.coroutineContext, scope.clock) + scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) @@ -61,7 +62,7 @@ class SimMachineBenchmarks { ) } - @State(Scope.Thread) + @State(Scope.Benchmark) class Workload { lateinit var trace: Sequence @@ -120,7 +121,7 @@ class SimMachineBenchmarks { coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor() + val hypervisor = SimFairShareHypervisor(scheduler) launch { machine.run(hypervisor) } @@ -142,12 +143,12 @@ class SimMachineBenchmarks { coroutineContext, clock, machineModel, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor() + val hypervisor = SimFairShareHypervisor(scheduler) launch { machine.run(hypervisor) } coroutineScope { - repeat(2) { i -> + repeat(2) { val vm = hypervisor.createMachine(machineModel) launch { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 09ee601e..27ebba21 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* import org.opendc.simulator.compute.cpufreq.ScalingDriver import org.opendc.simulator.compute.cpufreq.ScalingGovernor import org.opendc.simulator.compute.model.ProcessingUnit @@ -60,7 +59,7 @@ public class SimBareMetalMachine( /** * The [TimerScheduler] to use for scheduling the interrupts. */ - private val scheduler = TimerScheduler(this.context, clock) + private val scheduler = SimResourceSchedulerTrampoline(this.context, clock) override val cpus: List = model.cpus.map { ProcessingUnitImpl(it) } @@ -96,7 +95,6 @@ public class SimBareMetalMachine( override fun close() { super.close() - scheduler.close() scope.cancel() } @@ -107,7 +105,7 @@ public class SimBareMetalMachine( /** * The actual resource supporting the processing unit. */ - private val source = SimResourceSource(model.frequency, clock, scheduler) + private val source = SimResourceSource(model.frequency, scheduler) override val speed: Double get() = source.speed diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt index fa677de9..11aec2de 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisor.kt @@ -31,13 +31,13 @@ import org.opendc.simulator.resources.* * * @param listener The hypervisor listener to use. */ -public class SimFairShareHypervisor(private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { +public class SimFairShareHypervisor(private val scheduler: SimResourceScheduler, private val listener: SimHypervisor.Listener? = null) : SimAbstractHypervisor() { override fun canFit(model: SimMachineModel, switch: SimResourceSwitch): Boolean = true override fun createSwitch(ctx: SimMachineContext): SimResourceSwitch { return SimResourceSwitchMaxMin( - ctx.clock, + scheduler, object : SimResourceSwitchMaxMin.Listener { override fun onSliceFinish( switch: SimResourceSwitchMaxMin, diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt index 02eb6ad0..2ab3ea09 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairShareHypervisorProvider.kt @@ -22,11 +22,17 @@ package org.opendc.simulator.compute +import org.opendc.simulator.resources.SimResourceSchedulerTrampoline +import java.time.Clock +import kotlin.coroutines.CoroutineContext + /** * A [SimHypervisorProvider] for the [SimFairShareHypervisor] implementation. */ public class SimFairShareHypervisorProvider : SimHypervisorProvider { override val id: String = "fair-share" - override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimFairShareHypervisor(listener) + override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { + return SimFairShareHypervisor(SimResourceSchedulerTrampoline(context, clock), listener) + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt index a5b4526b..b66020f4 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisorProvider.kt @@ -22,6 +22,9 @@ package org.opendc.simulator.compute +import java.time.Clock +import kotlin.coroutines.CoroutineContext + /** * A service provider interface for constructing a [SimHypervisor]. */ @@ -37,5 +40,5 @@ public interface SimHypervisorProvider { /** * Create a [SimHypervisor] instance with the specified [listener]. */ - public fun create(listener: SimHypervisor.Listener? = null): SimHypervisor + public fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener? = null): SimHypervisor } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt index e2044d05..83b924d7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorProvider.kt @@ -22,11 +22,16 @@ package org.opendc.simulator.compute +import java.time.Clock +import kotlin.coroutines.CoroutineContext + /** * A [SimHypervisorProvider] for the [SimSpaceSharedHypervisor] implementation. */ public class SimSpaceSharedHypervisorProvider : SimHypervisorProvider { override val id: String = "space-shared" - override fun create(listener: SimHypervisor.Listener?): SimHypervisor = SimSpaceSharedHypervisor() + override fun create(context: CoroutineContext, clock: Clock, listener: SimHypervisor.Listener?): SimHypervisor { + return SimSpaceSharedHypervisor() + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index a067dd2e..8886caa7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -39,6 +39,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.SimResourceSchedulerTrampoline /** * Test suite for the [SimHypervisor] class. @@ -93,7 +94,7 @@ internal class SimHypervisorTest { ) val machine = SimBareMetalMachine(coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0))) - val hypervisor = SimFairShareHypervisor(listener) + val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) launch { machine.run(hypervisor) @@ -167,7 +168,7 @@ internal class SimHypervisorTest { coroutineContext, clock, model, PerformanceScalingGovernor(), SimpleScalingDriver(ConstantPowerModel(0.0)) ) - val hypervisor = SimFairShareHypervisor(listener) + val hypervisor = SimFairShareHypervisor(SimResourceSchedulerTrampoline(coroutineContext, clock), listener) launch { machine.run(hypervisor) diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt deleted file mode 100644 index 8d2587b1..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/BenchmarkHelpers.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.simulator.resources.consumer.SimTraceConsumer - -/** - * Helper function to create simple consumer workload. - */ -fun createSimpleConsumer(): SimResourceConsumer { - return SimTraceConsumer( - sequenceOf( - SimTraceConsumer.Fragment(1000, 28.0), - SimTraceConsumer.Fragment(1000, 3500.0), - SimTraceConsumer.Fragment(1000, 0.0), - SimTraceConsumer.Fragment(1000, 183.0), - SimTraceConsumer.Fragment(1000, 400.0), - SimTraceConsumer.Fragment(1000, 100.0), - SimTraceConsumer.Fragment(1000, 3000.0), - SimTraceConsumer.Fragment(1000, 4500.0), - ), - ) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index beda3eaa..cd5f33bd 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -26,7 +26,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.utils.TimerScheduler +import org.opendc.simulator.resources.consumer.SimTraceConsumer import org.openjdk.jmh.annotations.* import java.util.concurrent.TimeUnit @@ -37,67 +37,76 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimResourceBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: TimerScheduler + private lateinit var scheduler: SimResourceScheduler @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = TimerScheduler(scope.coroutineContext, scope.clock) + scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) } @State(Scope.Thread) class Workload { - lateinit var consumers: Array + lateinit var trace: Sequence @Setup fun setUp() { - consumers = Array(3) { createSimpleConsumer() } + trace = sequenceOf( + SimTraceConsumer.Fragment(1000, 28.0), + SimTraceConsumer.Fragment(1000, 3500.0), + SimTraceConsumer.Fragment(1000, 0.0), + SimTraceConsumer.Fragment(1000, 183.0), + SimTraceConsumer.Fragment(1000, 400.0), + SimTraceConsumer.Fragment(1000, 100.0), + SimTraceConsumer.Fragment(1000, 3000.0), + SimTraceConsumer.Fragment(1000, 4500.0), + ) } } @Benchmark fun benchmarkSource(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, clock, scheduler) - return@runBlockingSimulation provider.consume(state.consumers[0]) + val provider = SimResourceSource(4200.0, scheduler) + return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @Benchmark fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, clock, scheduler) + val provider = SimResourceSource(4200.0, scheduler) val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) - return@runBlockingSimulation forwarder.consume(state.consumers[0]) + return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace)) } } @Benchmark fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(clock) + val switch = SimResourceSwitchMaxMin(scheduler) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) val provider = switch.addOutput(3500.0) - return@runBlockingSimulation provider.consume(state.consumers[0]) + return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @Benchmark fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(clock) + val switch = SimResourceSwitchMaxMin(scheduler) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) repeat(3) { i -> launch { val provider = switch.addOutput(3500.0) - provider.consume(state.consumers[i]) + provider.consume(SimTraceConsumer(state.trace)) } } } @@ -108,11 +117,11 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) val provider = switch.addOutput(3500.0) - return@runBlockingSimulation provider.consume(state.consumers[0]) + return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -121,13 +130,13 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) - switch.addInput(SimResourceSource(3000.0, clock, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, scheduler)) - repeat(2) { i -> + repeat(2) { launch { val provider = switch.addOutput(3500.0) - provider.consume(state.consumers[i]) + provider.consume(SimTraceConsumer(state.trace)) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 1bcaf45f..6ae04f27 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -22,12 +22,10 @@ package org.opendc.simulator.resources -import java.time.Clock - /** * Abstract implementation of [SimResourceAggregator]. */ -public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { +public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator { /** * This method is invoked when the resource consumer consumes resources. */ @@ -77,7 +75,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : protected val outputContext: SimResourceContext get() = context - private val context = object : SimAbstractResourceContext(0.0, clock, _output) { + private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) { override val remainingWork: Double get() { val now = clock.millis() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index d2f585b1..c03bfad5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -31,9 +31,16 @@ import kotlin.math.min */ public abstract class SimAbstractResourceContext( initialCapacity: Double, - override val clock: Clock, + private val scheduler: SimResourceScheduler, private val consumer: SimResourceConsumer -) : SimResourceContext { +) : SimResourceContext, SimResourceFlushable { + + /** + * The clock of the context. + */ + public override val clock: Clock + get() = scheduler.clock + /** * The capacity of the resource. */ @@ -143,21 +150,12 @@ public abstract class SimAbstractResourceContext( flush(isIntermediate = true) doStop() - } catch (cause: Throwable) { - doFail(cause) } finally { isProcessing = false } } - /** - * Flush the current active resource consumption. - * - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun flush(isIntermediate: Boolean = false) { + override fun flush(isIntermediate: Boolean) { // Flush is no-op when the consumer is finished or not yet started if (state != SimResourceState.Active) { return @@ -226,7 +224,7 @@ public abstract class SimAbstractResourceContext( return } - flush() + scheduler.schedule(this, isIntermediate = false) } override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" @@ -234,7 +232,7 @@ public abstract class SimAbstractResourceContext( /** * A flag to indicate that the resource is currently processing a command. */ - protected var isProcessing: Boolean = false + private var isProcessing: Boolean = false /** * The current command that is being processed. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 5d550ad8..5665abd1 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -22,12 +22,10 @@ package org.opendc.simulator.resources -import java.time.Clock - /** * A [SimResourceAggregator] that distributes the load equally across the input resources. */ -public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { +public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) { private val consumers = mutableListOf() override fun doConsume(work: Double, limit: Double, deadline: Long) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 8128c98b..a76cb1e3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources -import java.time.Clock import kotlin.math.max import kotlin.math.min @@ -31,7 +30,7 @@ import kotlin.math.min */ public class SimResourceDistributorMaxMin( override val input: SimResourceProvider, - private val clock: Clock, + private val scheduler: SimResourceScheduler, private val listener: Listener? = null ) : SimResourceDistributor { override val outputs: Set @@ -220,7 +219,7 @@ public class SimResourceDistributorMaxMin( } } - assert(deadline >= clock.millis()) { "Deadline already passed" } + assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" } this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork @@ -337,7 +336,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable { + ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable { /** * The current command that is processed by the vCPU. */ @@ -402,6 +401,8 @@ public class SimResourceDistributorMaxMin( } } + private var isProcessing: Boolean = false + override fun interrupt() { // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead // to infinite recursion. @@ -409,10 +410,16 @@ public class SimResourceDistributorMaxMin( return } - super.interrupt() + try { + isProcessing = false - // Force the scheduler to re-schedule - schedule() + super.interrupt() + + // Force the scheduler to re-schedule + schedule() + } finally { + isProcessing = true + } } override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt new file mode 100644 index 00000000..f6a1a42e --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer. + */ +public interface SimResourceFlushable { + /** + * Flush the current active resource consumption. + * + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun flush(isIntermediate: Boolean) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt new file mode 100644 index 00000000..a228c47b --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import java.time.Clock + +/** + * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource + * providers and consumers. + * + * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more + * efficiently, reducing the overall work needed per update. + */ +public interface SimResourceScheduler { + /** + * The [Clock] associated with this scheduler. + */ + public val clock: Clock + + /** + * Schedule a direct interrupt for the resource context represented by [flushable]. + * + * @param flushable The resource context that needs to be flushed. + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false) + + /** + * Schedule an interrupt in the future for the resource context represented by [flushable]. + * + * This method will override earlier calls to this method for the same [flushable]. + * + * @param flushable The resource context that needs to be flushed. + * @param timestamp The timestamp when the interrupt should happen. + * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be + * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer + * will be asked to deliver a new command and is essentially interrupted. + */ + public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false) + + /** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ + public fun batch(block: () -> Unit) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt new file mode 100644 index 00000000..cdbb4a6c --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.utils.TimerScheduler +import java.time.Clock +import java.util.ArrayDeque +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after. + * + * @param clock The virtual simulation clock. + */ +public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler { + /** + * The [TimerScheduler] to actually schedule the interrupts. + */ + private val timers = TimerScheduler(context, clock) + + /** + * A flag to indicate that an interrupt is currently running already. + */ + private var isRunning: Boolean = false + + /** + * The queue of resources to be flushed. + */ + private val queue = ArrayDeque>() + + override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) { + queue.add(flushable to isIntermediate) + + if (isRunning) { + return + } + + flush() + } + + override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) { + timers.startSingleTimerTo(flushable, timestamp) { + schedule(flushable, isIntermediate) + } + } + + override fun batch(block: () -> Unit) { + val wasAlreadyRunning = isRunning + try { + isRunning = true + block() + } finally { + if (!wasAlreadyRunning) { + isRunning = false + } + } + } + + /** + * Flush the scheduled queue. + */ + private fun flush() { + val visited = mutableSetOf() + try { + isRunning = true + while (queue.isNotEmpty()) { + val (flushable, isIntermediate) = queue.poll() + flushable.flush(isIntermediate) + visited.add(flushable) + } + } finally { + isRunning = false + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index fe569096..3277b889 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.resources -import org.opendc.utils.TimerScheduler -import java.time.Clock import kotlin.math.ceil import kotlin.math.min @@ -31,13 +29,11 @@ import kotlin.math.min * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. * * @param initialCapacity The initial capacity of the resource. - * @param clock The virtual clock to track simulation time. * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( initialCapacity: Double, - private val clock: Clock, - private val scheduler: TimerScheduler + private val scheduler: SimResourceScheduler ) : SimResourceProvider { /** * The current processing speed of the resource. @@ -96,22 +92,21 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) { override fun onIdle(deadline: Long) { // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { - scheduler.startSingleTimerTo(this, deadline) { flush() } + scheduler.schedule(this, deadline) } } override fun onConsume(work: Double, limit: Double, deadline: Long) { val until = min(deadline, clock.millis() + getDuration(work, speed)) - - scheduler.startSingleTimerTo(this, until, ::flush) + scheduler.schedule(this, until) } override fun onFinish() { - scheduler.cancel(this) + cancel() ctx = null diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index c796c251..5dc1e68d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -23,14 +23,13 @@ package org.opendc.simulator.resources import kotlinx.coroutines.* -import java.time.Clock /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ public class SimResourceSwitchMaxMin( - clock: Clock, + scheduler: SimResourceScheduler, private val listener: Listener? = null ) : SimResourceSwitch { private val _outputs = mutableSetOf() @@ -49,13 +48,13 @@ public class SimResourceSwitchMaxMin( /** * The aggregator to aggregate the resources. */ - private val aggregator = SimResourceAggregatorMaxMin(clock) + private val aggregator = SimResourceAggregatorMaxMin(scheduler) /** * The distributor to distribute the aggregated resources. */ private val distributor = SimResourceDistributorMaxMin( - aggregator.output, clock, + aggregator.output, scheduler, object : SimResourceDistributorMaxMin.Listener { override fun onSliceFinish( switch: SimResourceDistributor, diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index e78bcdac..2b32300e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * Test suite for the [SimResourceAggregatorMaxMin] class. @@ -42,19 +41,19 @@ import org.opendc.utils.TimerScheduler internal class SimResourceAggregatorMaxMinTest { @Test fun testSingleCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val forwarder = SimResourceForwarder() val sources = listOf( forwarder, - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) val consumer = SimWorkConsumer(1.0, 0.5) val usage = mutableListOf() - val source = SimResourceSource(1.0, clock, scheduler) + val source = SimResourceSource(1.0, scheduler) val adapter = SimSpeedConsumerAdapter(forwarder, usage::add) source.startConsumer(adapter) @@ -73,12 +72,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testDoubleCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -100,12 +99,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testOvercommit() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -127,12 +126,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testException() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -152,12 +151,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) @@ -177,12 +176,12 @@ internal class SimResourceAggregatorMaxMinTest { @Test fun testFailOverCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - val aggregator = SimResourceAggregatorMaxMin(clock) + val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( - SimResourceSource(1.0, clock, scheduler), - SimResourceSource(1.0, clock, scheduler) + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) ) sources.forEach(aggregator::addInput) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 8c15ec71..2e2d6588 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -34,24 +34,26 @@ import org.opendc.simulator.core.runBlockingSimulation class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} override fun onFinish() {} } - context.flush() + context.flush(isIntermediate = false) } @Test fun testIntermediateFlush() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -66,10 +68,11 @@ class SimResourceContextTest { @Test fun testIntermediateFlushIdle() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -89,10 +92,11 @@ class SimResourceContextTest { @Test fun testDoubleStart() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, clock, consumer) { + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { override fun onIdle(deadline: Long) {} override fun onFinish() {} override fun onConsume(work: Double, limit: Double, deadline: Long) {} @@ -101,4 +105,43 @@ class SimResourceContextTest { context.start() assertThrows { context.start() } } + + @Test + fun testIdempodentCapacityChange() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val consumer = mockk(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit + + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { + override fun onIdle(deadline: Long) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + override fun onFinish() {} + } + + context.start() + context.capacity = 4200.0 + + verify(exactly = 0) { consumer.onEvent(any(), SimResourceEvent.Capacity) } + } + + @Test + fun testFailureNoInfiniteLoop() = runBlockingSimulation { + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val consumer = mockk(relaxUnitFun = true) + every { consumer.onNext(any()) } returns SimResourceCommand.Exit + every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") + every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") + + val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { + override fun onIdle(deadline: Long) {} + override fun onConsume(work: Double, limit: Double, deadline: Long) {} + override fun onFinish() {} + } + + context.start() + + delay(1) + + verify(exactly = 1) { consumer.onFailure(any(), any()) } + } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 361a1516..5e86088d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * A test suite for the [SimResourceSource] class. @@ -41,9 +40,9 @@ import org.opendc.utils.TimerScheduler class SimResourceSourceTest { @Test fun testSpeed() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -58,15 +57,14 @@ class SimResourceSourceTest { assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { - scheduler.close() provider.close() } } @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val provider = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -79,16 +77,15 @@ class SimResourceSourceTest { assertEquals(3000, clock.millis()) verify(exactly = 1) { consumer.onEvent(any(), SimResourceEvent.Capacity) } } finally { - scheduler.close() provider.close() } } @Test fun testSpeedLimit() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -103,7 +100,6 @@ class SimResourceSourceTest { assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } } finally { - scheduler.close() provider.close() } } @@ -114,9 +110,9 @@ class SimResourceSourceTest { */ @Test fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = object : SimResourceConsumer { override fun onNext(ctx: SimResourceContext): SimResourceCommand { @@ -131,16 +127,15 @@ class SimResourceSourceTest { try { provider.consume(consumer) } finally { - scheduler.close() provider.close() } } @Test fun testInterrupt() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) lateinit var resCtx: SimResourceContext val consumer = object : SimResourceConsumer { @@ -173,16 +168,15 @@ class SimResourceSourceTest { assertEquals(0, clock.millis()) } finally { - scheduler.close() provider.close() } } @Test fun testFailure() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onEvent(any(), eq(SimResourceEvent.Start)) } @@ -193,16 +187,15 @@ class SimResourceSourceTest { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } @Test fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -214,16 +207,15 @@ class SimResourceSourceTest { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } @Test fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -238,16 +230,15 @@ class SimResourceSourceTest { } } } finally { - scheduler.close() provider.close() } } @Test fun testClosedConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -260,16 +251,15 @@ class SimResourceSourceTest { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } @Test fun testCloseDuringConsumption() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -283,16 +273,15 @@ class SimResourceSourceTest { assertEquals(500, clock.millis()) } finally { - scheduler.close() provider.close() } } @Test fun testIdle() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -304,7 +293,6 @@ class SimResourceSourceTest { assertEquals(500, clock.millis()) } finally { - scheduler.close() provider.close() } } @@ -313,9 +301,9 @@ class SimResourceSourceTest { fun testInfiniteSleep() { assertThrows { runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -325,7 +313,6 @@ class SimResourceSourceTest { try { provider.consume(consumer) } finally { - scheduler.close() provider.close() } } @@ -334,9 +321,9 @@ class SimResourceSourceTest { @Test fun testIncorrectDeadline() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val capacity = 4200.0 - val provider = SimResourceSource(capacity, clock, scheduler) + val provider = SimResourceSource(capacity, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } @@ -348,7 +335,6 @@ class SimResourceSourceTest { assertThrows { provider.consume(consumer) } } finally { - scheduler.close() provider.close() } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 1b1f7790..32b6d8ad 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -33,7 +33,6 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.utils.TimerScheduler /** * Test suite for the [SimResourceSwitchExclusive] class. @@ -45,7 +44,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTrace() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val speed = mutableListOf() @@ -61,7 +60,7 @@ internal class SimResourceSwitchExclusiveTest { ) val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) val forwarder = SimResourceForwarder() val adapter = SimSpeedConsumerAdapter(forwarder, speed::add) source.startConsumer(adapter) @@ -87,14 +86,14 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) switch.addInput(source) @@ -114,7 +113,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = object : SimResourceConsumer { @@ -138,7 +137,7 @@ internal class SimResourceSwitchExclusiveTest { } val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) switch.addInput(source) @@ -159,14 +158,14 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk(relaxUnitFun = true) every { workload.onNext(any()) } returns SimResourceCommand.Consume(duration / 1000.0, 1.0) andThen SimResourceCommand.Exit val switch = SimResourceSwitchExclusive() - val source = SimResourceSource(3200.0, clock, scheduler) + val source = SimResourceSource(3200.0, scheduler) switch.addInput(source) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index 7416f277..e7dec172 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer -import org.opendc.utils.TimerScheduler /** * Test suite for the [SimResourceSwitch] implementations @@ -41,10 +40,10 @@ import org.opendc.utils.TimerScheduler internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - val switch = SimResourceSwitchMaxMin(clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val switch = SimResourceSwitchMaxMin(scheduler) - val sources = List(2) { SimResourceSource(2000.0, clock, scheduler) } + val sources = List(2) { SimResourceSource(2000.0, scheduler) } sources.forEach { switch.addInput(it) } val provider = switch.addOutput(1000.0) @@ -57,7 +56,6 @@ internal class SimResourceSwitchMaxMinTest { yield() } finally { switch.close() - scheduler.close() } } @@ -66,7 +64,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L @@ -99,16 +97,15 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(clock, listener) + val switch = SimResourceSwitchMaxMin(scheduler, listener) val provider = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(3200.0, clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, scheduler)) provider.consume(workload) yield() } finally { switch.close() - scheduler.close() } assertAll( @@ -124,7 +121,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) val listener = object : SimResourceSwitchMaxMin.Listener { var totalRequestedWork = 0L @@ -166,12 +163,12 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(clock, listener) + val switch = SimResourceSwitchMaxMin(scheduler, listener) val providerA = switch.addOutput(3200.0) val providerB = switch.addOutput(3200.0) try { - switch.addInput(SimResourceSource(3200.0, clock, scheduler)) + switch.addInput(SimResourceSource(3200.0, scheduler)) coroutineScope { launch { providerA.consume(workloadA) } @@ -181,7 +178,6 @@ internal class SimResourceSwitchMaxMinTest { yield() } finally { switch.close() - scheduler.close() } assertAll( { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index e3ca5845..880e1755 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * A test suite for the [SimResourceTransformer] class. @@ -42,8 +41,8 @@ internal class SimResourceTransformerTest { @Test fun testExitImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) launch { source.consume(forwarder) @@ -57,14 +56,13 @@ internal class SimResourceTransformerTest { }) forwarder.close() - scheduler.close() } @Test fun testExit() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) launch { source.consume(forwarder) @@ -124,8 +122,8 @@ internal class SimResourceTransformerTest { @Test fun testCancelStartedDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) @@ -143,8 +141,8 @@ internal class SimResourceTransformerTest { @Test fun testCancelPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) @@ -162,8 +160,8 @@ internal class SimResourceTransformerTest { @Test fun testExitPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(2000.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(2000.0, scheduler) val consumer = mockk(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Exit @@ -178,8 +176,8 @@ internal class SimResourceTransformerTest { @Test fun testAdjustCapacity() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) source.startConsumer(forwarder) @@ -197,8 +195,8 @@ internal class SimResourceTransformerTest { @Test fun testTransformExit() = runBlockingSimulation { val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } - val scheduler = TimerScheduler(coroutineContext, clock) - val source = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) source.startConsumer(forwarder) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index bf58b1b6..ac8b5814 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer -import org.opendc.utils.TimerScheduler /** * A test suite for the [SimWorkConsumer] class. @@ -36,8 +35,8 @@ import org.opendc.utils.TimerScheduler internal class SimWorkConsumerTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 1.0) @@ -51,8 +50,8 @@ internal class SimWorkConsumerTest { @Test fun testUtilization() = runBlockingSimulation { - val scheduler = TimerScheduler(coroutineContext, clock) - val provider = SimResourceSource(1.0, clock, scheduler) + val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 0.5) -- cgit v1.2.3