diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-21 11:01:07 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-05-03 21:17:48 +0200 |
| commit | 5c8cecaf5b8d24ffcd99ce45b922c5a853bd492d (patch) | |
| tree | 0a84584c196e459ccaa98ed5fa52246babe1feb6 /opendc-simulator/opendc-simulator-resources | |
| parent | 17ffe995ee06d5755cd3943a5ea14f982884009e (diff) | |
simulator: Simplify scheduling logic of resource aggregator
This change simplifies the scheduling logic of the resource aggregator.
Previously, after each scheduling cycle, each aggregated input was
interrupted. With the new approach, the scheduler can decide which ones
of the inputs to send a new command to.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources')
4 files changed, 90 insertions, 104 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index c7fa6a17..f4459c54 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -29,24 +29,6 @@ import java.time.Clock */ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { /** - * The available resource provider contexts. - */ - protected val inputContexts: Set<SimResourceContext> - get() = _inputContexts - private val _inputContexts = mutableSetOf<SimResourceContext>() - - /** - * The output context. - */ - protected val outputContext: SimResourceContext - get() = context - - /** - * The commands to submit to the underlying input resources. - */ - protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf() - - /** * This method is invoked when the resource consumer consumes resources. */ protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) @@ -54,37 +36,29 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : /** * This method is invoked when the resource consumer enters an idle state. */ - protected open fun doIdle(deadline: Long) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Idle(deadline) - } - } + protected abstract fun doIdle(deadline: Long) /** * This method is invoked when the resource consumer finishes processing. */ - protected open fun doFinish(cause: Throwable?) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Exit - } - } + protected abstract fun doFinish(cause: Throwable?) /** * This method is invoked when an input context is started. */ - protected open fun onContextStarted(ctx: SimResourceContext) { - _inputContexts.add(ctx) - } + protected abstract fun onInputStarted(input: Input) - protected open fun onContextFinished(ctx: SimResourceContext) { - assert(_inputContexts.remove(ctx)) { "Lost context" } - } + /** + * This method is invoked when an input is stopped. + */ + protected abstract fun onInputFinished(input: Input) override fun addInput(input: SimResourceProvider) { check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } val consumer = Consumer() _inputs.add(input) + _inputConsumers.add(consumer) input.startConsumer(consumer) } @@ -99,15 +73,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : override val inputs: Set<SimResourceProvider> get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() + private val _inputConsumers = mutableListOf<Consumer>() - private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { + protected val outputContext: SimResourceContext + get() = context + private val context = object : SimAbstractResourceContext(0.0, clock, _output) { override val remainingWork: Double get() { val now = clock.millis() return if (_remainingWorkFlush < now) { _remainingWorkFlush = now - _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } + _inputConsumers.sumByDouble { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it } } else { _remainingWork } @@ -115,12 +92,6 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private var _remainingWork: Double = 0.0 private var _remainingWorkFlush: Long = Long.MIN_VALUE - override fun interrupt() { - super.interrupt() - - interruptAll() - } - override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) override fun onIdle(deadline: Long) = doIdle(deadline) @@ -129,80 +100,80 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : doFinish(cause) super.onFinish(cause) - - interruptAll() } } /** - * A flag to indicate that an interrupt is active. - */ - private var isInterrupting: Boolean = false - - /** - * Schedule the work over the input resources. + * An input for the resource aggregator. */ - private fun doSchedule() { - context.flush(isIntermediate = true) - interruptAll() + public interface Input { + /** + * The [SimResourceContext] associated with the input. + */ + public val ctx: SimResourceContext + + /** + * Push the specified [SimResourceCommand] to the input. + */ + public fun push(command: SimResourceCommand) } /** - * Interrupt all inputs. + * An internal [SimResourceConsumer] implementation for aggregator inputs. */ - private fun interruptAll() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isInterrupting) { - return + private inner class Consumer : Input, SimResourceConsumer { + /** + * The resource context associated with the input. + */ + override val ctx: SimResourceContext + get() = _ctx!! + var _ctx: SimResourceContext? = null + + /** + * The resource command to run next. + */ + private var command: SimResourceCommand? = null + + /* Input */ + override fun push(command: SimResourceCommand) { + this.command = command + _ctx?.interrupt() } - try { - isInterrupting = true - - val iterator = _inputs.iterator() - while (iterator.hasNext()) { - val input = iterator.next() - input.interrupt() - - if (input.state != SimResourceState.Active) { - iterator.remove() - } - } - } finally { - isInterrupting = false - } - } - - /** - * An internal [SimResourceConsumer] implementation for aggregator inputs. - */ - private inner class Consumer : SimResourceConsumer { + /* SimResourceConsumer */ override fun onStart(ctx: SimResourceContext) { - onContextStarted(ctx) + _ctx = ctx onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { context.start() } + + onInputStarted(this) } override fun onNext(ctx: SimResourceContext): SimResourceCommand { - doSchedule() - - return commands[ctx] ?: SimResourceCommand.Idle() + var next = command + + return if (next != null) { + this.command = null + next + } else { + context.flush(isIntermediate = true) + next = command + this.command = null + next ?: SimResourceCommand.Idle() + } } override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { // Adjust capacity of output resource - context.capacity = inputContexts.sumByDouble { it.capacity } + context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 } } override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - onContextFinished(ctx) - - super.onFinish(ctx, cause) + onInputFinished(this) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 08bc064e..5d550ad8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -28,36 +28,46 @@ import java.time.Clock * A [SimResourceAggregator] that distributes the load equally across the input resources. */ public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { - private val consumers = mutableListOf<SimResourceContext>() + private val consumers = mutableListOf<Input>() override fun doConsume(work: Double, limit: Double, deadline: Long) { // Sort all consumers by their capacity - consumers.sortWith(compareBy { it.capacity }) + consumers.sortWith(compareBy { it.ctx.capacity }) // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { - val inputCapacity = input.capacity + val inputCapacity = input.ctx.capacity val fraction = inputCapacity / outputContext.capacity val grantedSpeed = limit * fraction val grantedWork = fraction * work - commands[input] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) + val command = if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + input.push(command) } } - override fun onContextStarted(ctx: SimResourceContext) { - super.onContextStarted(ctx) + override fun doIdle(deadline: Long) { + for (input in consumers) { + input.push(SimResourceCommand.Idle(deadline)) + } + } - consumers.add(ctx) + override fun doFinish(cause: Throwable?) { + val iterator = consumers.iterator() + for (input in iterator) { + iterator.remove() + input.push(SimResourceCommand.Exit) + } } - override fun onContextFinished(ctx: SimResourceContext) { - super.onContextFinished(ctx) + override fun onInputStarted(input: Input) { + consumers.add(input) + } - consumers.remove(ctx) + override fun onInputFinished(input: Input) { + consumers.remove(input) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 025b0406..157db3cb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -112,7 +112,12 @@ public class SimResourceSource( override fun onFinish(cause: Throwable?) { scheduler.cancel(this) - cancel() + + ctx = null + + if (this@SimResourceSource.state != SimResourceState.Stopped) { + this@SimResourceSource.state = SimResourceState.Pending + } super.onFinish(cause) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index e272abb8..e78bcdac 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -139,7 +139,7 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) + .andThenThrows(IllegalStateException("Test Exception")) try { assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } |
