diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources')
4 files changed, 90 insertions, 104 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index c7fa6a17..f4459c54 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -29,24 +29,6 @@ import java.time.Clock */ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator { /** - * The available resource provider contexts. - */ - protected val inputContexts: Set<SimResourceContext> - get() = _inputContexts - private val _inputContexts = mutableSetOf<SimResourceContext>() - - /** - * The output context. - */ - protected val outputContext: SimResourceContext - get() = context - - /** - * The commands to submit to the underlying input resources. - */ - protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf() - - /** * This method is invoked when the resource consumer consumes resources. */ protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) @@ -54,37 +36,29 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : /** * This method is invoked when the resource consumer enters an idle state. */ - protected open fun doIdle(deadline: Long) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Idle(deadline) - } - } + protected abstract fun doIdle(deadline: Long) /** * This method is invoked when the resource consumer finishes processing. */ - protected open fun doFinish(cause: Throwable?) { - for (input in inputContexts) { - commands[input] = SimResourceCommand.Exit - } - } + protected abstract fun doFinish(cause: Throwable?) /** * This method is invoked when an input context is started. */ - protected open fun onContextStarted(ctx: SimResourceContext) { - _inputContexts.add(ctx) - } + protected abstract fun onInputStarted(input: Input) - protected open fun onContextFinished(ctx: SimResourceContext) { - assert(_inputContexts.remove(ctx)) { "Lost context" } - } + /** + * This method is invoked when an input is stopped. + */ + protected abstract fun onInputFinished(input: Input) override fun addInput(input: SimResourceProvider) { check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } val consumer = Consumer() _inputs.add(input) + _inputConsumers.add(consumer) input.startConsumer(consumer) } @@ -99,15 +73,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : override val inputs: Set<SimResourceProvider> get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() + private val _inputConsumers = mutableListOf<Consumer>() - private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { + protected val outputContext: SimResourceContext + get() = context + private val context = object : SimAbstractResourceContext(0.0, clock, _output) { override val remainingWork: Double get() { val now = clock.millis() return if (_remainingWorkFlush < now) { _remainingWorkFlush = now - _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it } + _inputConsumers.sumByDouble { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it } } else { _remainingWork } @@ -115,12 +92,6 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private var _remainingWork: Double = 0.0 private var _remainingWorkFlush: Long = Long.MIN_VALUE - override fun interrupt() { - super.interrupt() - - interruptAll() - } - override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) override fun onIdle(deadline: Long) = doIdle(deadline) @@ -129,80 +100,80 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : doFinish(cause) super.onFinish(cause) - - interruptAll() } } /** - * A flag to indicate that an interrupt is active. - */ - private var isInterrupting: Boolean = false - - /** - * Schedule the work over the input resources. + * An input for the resource aggregator. */ - private fun doSchedule() { - context.flush(isIntermediate = true) - interruptAll() + public interface Input { + /** + * The [SimResourceContext] associated with the input. + */ + public val ctx: SimResourceContext + + /** + * Push the specified [SimResourceCommand] to the input. + */ + public fun push(command: SimResourceCommand) } /** - * Interrupt all inputs. + * An internal [SimResourceConsumer] implementation for aggregator inputs. */ - private fun interruptAll() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isInterrupting) { - return + private inner class Consumer : Input, SimResourceConsumer { + /** + * The resource context associated with the input. + */ + override val ctx: SimResourceContext + get() = _ctx!! + var _ctx: SimResourceContext? = null + + /** + * The resource command to run next. + */ + private var command: SimResourceCommand? = null + + /* Input */ + override fun push(command: SimResourceCommand) { + this.command = command + _ctx?.interrupt() } - try { - isInterrupting = true - - val iterator = _inputs.iterator() - while (iterator.hasNext()) { - val input = iterator.next() - input.interrupt() - - if (input.state != SimResourceState.Active) { - iterator.remove() - } - } - } finally { - isInterrupting = false - } - } - - /** - * An internal [SimResourceConsumer] implementation for aggregator inputs. - */ - private inner class Consumer : SimResourceConsumer { + /* SimResourceConsumer */ override fun onStart(ctx: SimResourceContext) { - onContextStarted(ctx) + _ctx = ctx onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { context.start() } + + onInputStarted(this) } override fun onNext(ctx: SimResourceContext): SimResourceCommand { - doSchedule() - - return commands[ctx] ?: SimResourceCommand.Idle() + var next = command + + return if (next != null) { + this.command = null + next + } else { + context.flush(isIntermediate = true) + next = command + this.command = null + next ?: SimResourceCommand.Idle() + } } override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { // Adjust capacity of output resource - context.capacity = inputContexts.sumByDouble { it.capacity } + context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 } } override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { - onContextFinished(ctx) - - super.onFinish(ctx, cause) + onInputFinished(this) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 08bc064e..5d550ad8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -28,36 +28,46 @@ import java.time.Clock * A [SimResourceAggregator] that distributes the load equally across the input resources. */ public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) { - private val consumers = mutableListOf<SimResourceContext>() + private val consumers = mutableListOf<Input>() override fun doConsume(work: Double, limit: Double, deadline: Long) { // Sort all consumers by their capacity - consumers.sortWith(compareBy { it.capacity }) + consumers.sortWith(compareBy { it.ctx.capacity }) // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { - val inputCapacity = input.capacity + val inputCapacity = input.ctx.capacity val fraction = inputCapacity / outputContext.capacity val grantedSpeed = limit * fraction val grantedWork = fraction * work - commands[input] = - if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) - else - SimResourceCommand.Idle(deadline) + val command = if (grantedWork > 0.0 && grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + else + SimResourceCommand.Idle(deadline) + input.push(command) } } - override fun onContextStarted(ctx: SimResourceContext) { - super.onContextStarted(ctx) + override fun doIdle(deadline: Long) { + for (input in consumers) { + input.push(SimResourceCommand.Idle(deadline)) + } + } - consumers.add(ctx) + override fun doFinish(cause: Throwable?) { + val iterator = consumers.iterator() + for (input in iterator) { + iterator.remove() + input.push(SimResourceCommand.Exit) + } } - override fun onContextFinished(ctx: SimResourceContext) { - super.onContextFinished(ctx) + override fun onInputStarted(input: Input) { + consumers.add(input) + } - consumers.remove(ctx) + override fun onInputFinished(input: Input) { + consumers.remove(input) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 025b0406..157db3cb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -112,7 +112,12 @@ public class SimResourceSource( override fun onFinish(cause: Throwable?) { scheduler.cancel(this) - cancel() + + ctx = null + + if (this@SimResourceSource.state != SimResourceState.Stopped) { + this@SimResourceSource.state = SimResourceState.Pending + } super.onFinish(cause) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index e272abb8..e78bcdac 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -139,7 +139,7 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } .returns(SimResourceCommand.Consume(1.0, 1.0)) - .andThenThrows(IllegalStateException()) + .andThenThrows(IllegalStateException("Test Exception")) try { assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } |
