diff options
2 files changed, 16 insertions, 80 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 8e0eb5f8..d064d7fa 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -32,7 +32,7 @@ public abstract class SimAbstractResourceAggregator( /** * This method is invoked when the resource consumer consumes resources. */ - protected abstract fun doConsume(limit: Double, duration: Long) + protected abstract fun doConsume(limit: Double) /** * This method is invoked when the resource consumer finishes processing. @@ -42,12 +42,12 @@ public abstract class SimAbstractResourceAggregator( /** * This method is invoked when an input context is started. */ - protected abstract fun onInputStarted(input: Input) + protected abstract fun onInputStarted(input: SimResourceContext) /** * This method is invoked when an input is stopped. */ - protected abstract fun onInputFinished(input: Input) + protected abstract fun onInputFinished(input: SimResourceContext) /* SimResourceAggregator */ override fun addInput(input: SimResourceProvider) { @@ -95,7 +95,7 @@ public abstract class SimAbstractResourceAggregator( return object : SimResourceProviderLogic { override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { - doConsume(limit, duration) + doConsume(limit) return super.onConsume(ctx, now, limit, duration) } @@ -113,90 +113,25 @@ public abstract class SimAbstractResourceAggregator( } } } - - /** - * Flush the progress of the output if possible. - */ - fun flush() { - ctx?.flush() - } - } - - /** - * An input for the resource aggregator. - */ - public interface Input : AutoCloseable { - /** - * The [SimResourceContext] associated with the input. - */ - public val ctx: SimResourceContext - - /** - * Push to this input with the specified [limit] and [duration]. - */ - public fun push(limit: Double, duration: Long) - - /** - * Close the input for further input. - */ - public override fun close() } /** * An internal [SimResourceConsumer] implementation for aggregator inputs. */ - private inner class Consumer : Input, SimResourceConsumer { + private inner class Consumer : SimResourceConsumer { /** * The resource context associated with the input. */ - override val ctx: SimResourceContext - get() = _ctx!! private var _ctx: SimResourceContext? = null - /** - * The resource command to run next. - */ - private var _duration: Long = Long.MAX_VALUE - - /** - * A flag to indicate that the consumer should flush. - */ - private var _isPushed = false - private fun updateCapacity() { // Adjust capacity of output resource _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } } - /* Input */ - override fun push(limit: Double, duration: Long) { - _duration = duration - val ctx = _ctx - if (ctx != null) { - ctx.push(limit) - ctx.interrupt() - } - _isPushed = true - } - - override fun close() { - _duration = Long.MAX_VALUE - _isPushed = true - _ctx?.close() - } - /* SimResourceConsumer */ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - var next = _duration - - if (!_isPushed) { - _output.flush() - next = _duration - } - - _isPushed = false - _duration = Long.MAX_VALUE - return next + return Long.MAX_VALUE } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -205,10 +140,10 @@ public abstract class SimAbstractResourceAggregator( _ctx = ctx updateCapacity() - onInputStarted(this) + onInputStarted(ctx) } SimResourceEvent.Capacity -> updateCapacity() - SimResourceEvent.Exit -> onInputFinished(this) + SimResourceEvent.Exit -> onInputFinished(ctx) else -> {} } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index b258a368..f131ac6c 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -29,19 +29,20 @@ public class SimResourceAggregatorMaxMin( interpreter: SimResourceInterpreter, parent: SimResourceSystem? = null ) : SimAbstractResourceAggregator(interpreter, parent) { - private val consumers = mutableListOf<Input>() + private val consumers = mutableListOf<SimResourceContext>() - override fun doConsume(limit: Double, duration: Long) { + override fun doConsume(limit: Double) { // Sort all consumers by their capacity - consumers.sortWith(compareBy { it.ctx.capacity }) + consumers.sortWith(compareBy { it.capacity }) // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { - val inputCapacity = input.ctx.capacity + val inputCapacity = input.capacity val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction - input.push(grantedSpeed, duration) + input.push(grantedSpeed) + input.interrupt() } } @@ -53,11 +54,11 @@ public class SimResourceAggregatorMaxMin( } } - override fun onInputStarted(input: Input) { + override fun onInputStarted(input: SimResourceContext) { consumers.add(input) } - override fun onInputFinished(input: Input) { + override fun onInputFinished(input: SimResourceContext) { consumers.remove(input) } } |
