diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-03 14:03:12 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-06-03 14:03:12 +0200 |
| commit | 1303fe97510fb7987746722b3261c696f523fbd5 (patch) | |
| tree | d927dbd4c71a5ea6435f5994e8fa0bc90ef19b2c /opendc-simulator/opendc-simulator-resources/src | |
| parent | ae987fa84b2e061eb9fdfec5561e1c976aaa5a54 (diff) | |
| parent | cef12722f03a24a0e1e3b7502fb5e434d93f1664 (diff) | |
simulator: Improve simulator resource model (#142)
This pull request improves the existing simulator resource model
that is at the core of all simulation models in OpenDC.
Most importantly, we have changed the way of how metrics are reported by this layer.
* Add `SimResourceInterpreter` which is responsible for efficiently scheduling
communication between resources in OpenDC. The performance gain is in the 2x-5x range.
* Add uniform interface for exposing resource metrics (using `SimResourceCounters`).
* Split the CPUFreq subsystem in the compute simulator as it mixed responsibilities of
different layers.
**Breaking API Changes**
* Resource providers now accept a `SimResourceInterpreter` which is
responsible for coordinating the communication between resources.
* `ScalingGovernor` is not part of the machine layer anymore. Instead, it should
move in the OS/Hypervisor layer.
* Workloads should now start CPU consumers using `cpu.startConsumer`.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src')
32 files changed, 1761 insertions, 1160 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt index cd5f33bd..b45b2a2f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-resources/src/jmh/kotlin/org/opendc/simulator/resources/SimResourceBenchmarks.kt @@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimResourceBenchmarks { private lateinit var scope: SimulationCoroutineScope - private lateinit var scheduler: SimResourceScheduler + private lateinit var interpreter: SimResourceInterpreter @Setup fun setUp() { scope = SimulationCoroutineScope() - scheduler = SimResourceSchedulerTrampoline(scope.coroutineContext, scope.clock) + interpreter = SimResourceInterpreter(scope.coroutineContext, scope.clock) } @State(Scope.Thread) @@ -67,7 +67,7 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSource(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, scheduler) + val provider = SimResourceSource(4200.0, interpreter) return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -75,7 +75,7 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkForwardOverhead(state: Workload) { return scope.runBlockingSimulation { - val provider = SimResourceSource(4200.0, scheduler) + val provider = SimResourceSource(4200.0, interpreter) val forwarder = SimResourceForwarder() provider.startConsumer(forwarder) return@runBlockingSimulation forwarder.consume(SimTraceConsumer(state.trace)) @@ -85,12 +85,12 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSwitchMaxMinSingleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(scheduler) + val switch = SimResourceSwitchMaxMin(interpreter) - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -98,14 +98,14 @@ class SimResourceBenchmarks { @Benchmark fun benchmarkSwitchMaxMinTripleConsumer(state: Workload) { return scope.runBlockingSimulation { - val switch = SimResourceSwitchMaxMin(scheduler) + val switch = SimResourceSwitchMaxMin(interpreter) - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) - repeat(3) { i -> + repeat(3) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } @@ -117,10 +117,10 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() return@runBlockingSimulation provider.consume(SimTraceConsumer(state.trace)) } } @@ -130,12 +130,12 @@ class SimResourceBenchmarks { return scope.runBlockingSimulation { val switch = SimResourceSwitchExclusive() - switch.addInput(SimResourceSource(3000.0, scheduler)) - switch.addInput(SimResourceSource(3000.0, scheduler)) + switch.addInput(SimResourceSource(3000.0, interpreter)) + switch.addInput(SimResourceSource(3000.0, interpreter)) repeat(2) { launch { - val provider = switch.addOutput(3500.0) + val provider = switch.newOutput() provider.consume(SimTraceConsumer(state.trace)) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 653b53e0..5fe7d7bb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -25,7 +25,10 @@ package org.opendc.simulator.resources /** * Abstract implementation of [SimResourceAggregator]. */ -public abstract class SimAbstractResourceAggregator(private val scheduler: SimResourceScheduler) : SimResourceAggregator { +public abstract class SimAbstractResourceAggregator( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? +) : SimResourceAggregator { /** * This method is invoked when the resource consumer consumes resources. */ @@ -39,7 +42,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe /** * This method is invoked when the resource consumer finishes processing. */ - protected abstract fun doFinish(cause: Throwable?) + protected abstract fun doFinish() /** * This method is invoked when an input context is started. @@ -51,8 +54,9 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe */ protected abstract fun onInputFinished(input: Input) + /* SimResourceAggregator */ override fun addInput(input: SimResourceProvider) { - check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" } + check(state != SimResourceState.Stopped) { "Aggregator has been stopped" } val consumer = Consumer() _inputs.add(input) @@ -60,42 +64,75 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe input.startConsumer(consumer) } - override fun close() { - output.close() - } - - override val output: SimResourceProvider - get() = _output - private val _output = SimResourceForwarder() - override val inputs: Set<SimResourceProvider> get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() private val _inputConsumers = mutableListOf<Consumer>() - protected val outputContext: SimResourceContext - get() = context - private val context = object : SimAbstractResourceContext(0.0, scheduler, _output) { - override val remainingWork: Double - get() { - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - _inputConsumers.sumOf { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it } - } else { - _remainingWork + /* SimResourceProvider */ + override val state: SimResourceState + get() = _output.state + + override val capacity: Double + get() = _output.capacity + + override val speed: Double + get() = _output.speed + + override val demand: Double + get() = _output.demand + + override val counters: SimResourceCounters + get() = _output.counters + + override fun startConsumer(consumer: SimResourceConsumer) { + _output.startConsumer(consumer) + } + + override fun cancel() { + _output.cancel() + } + + override fun interrupt() { + _output.interrupt() + } + + override fun close() { + _output.close() + } + + private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { + override fun createLogic(): SimResourceProviderLogic { + return object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + doIdle(deadline) + return Long.MAX_VALUE } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + doConsume(work, limit, deadline) + return Long.MAX_VALUE + } - override fun onIdle(deadline: Long) = doIdle(deadline) + override fun onFinish(ctx: SimResourceControllableContext) { + doFinish() + } - override fun onFinish() { - doFinish(null) + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return _inputConsumers.sumOf { it.remainingWork } + } + } + } + + /** + * Flush the progress of the output if possible. + */ + fun flush() { + ctx?.flush() } } @@ -123,7 +160,13 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe */ override val ctx: SimResourceContext get() = _ctx!! - var _ctx: SimResourceContext? = null + private var _ctx: SimResourceContext? = null + + /** + * The remaining work of the consumer. + */ + val remainingWork: Double + get() = _ctx?.remainingWork ?: 0.0 /** * The resource command to run next. @@ -132,7 +175,7 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe private fun updateCapacity() { // Adjust capacity of output resource - context.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } + _output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 } } /* Input */ @@ -149,7 +192,8 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe this.command = null next } else { - context.flush(isIntermediate = true) + _output.flush() + next = command this.command = null next ?: SimResourceCommand.Idle() @@ -162,11 +206,6 @@ public abstract class SimAbstractResourceAggregator(private val scheduler: SimRe _ctx = ctx updateCapacity() - // Make sure we initialize the output if we have not done so yet - if (context.state == SimResourceState.Pending) { - context.start() - } - onInputStarted(this) } SimResourceEvent.Capacity -> updateCapacity() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt deleted file mode 100644 index c03bfad5..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock -import kotlin.math.max -import kotlin.math.min - -/** - * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. - */ -public abstract class SimAbstractResourceContext( - initialCapacity: Double, - private val scheduler: SimResourceScheduler, - private val consumer: SimResourceConsumer -) : SimResourceContext, SimResourceFlushable { - - /** - * The clock of the context. - */ - public override val clock: Clock - get() = scheduler.clock - - /** - * The capacity of the resource. - */ - public final override var capacity: Double = initialCapacity - set(value) { - val oldValue = field - - // Only changes will be propagated - if (value != oldValue) { - field = value - onCapacityChange() - } - } - - /** - * The amount of work still remaining at this instant. - */ - override val remainingWork: Double - get() { - val activeCommand = activeCommand ?: return 0.0 - val now = clock.millis() - - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(activeCommand, now).also { _remainingWork = it } - } else { - _remainingWork - } - } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - - /** - * A flag to indicate the state of the context. - */ - public var state: SimResourceState = SimResourceState.Pending - private set - - /** - * The current processing speed of the resource. - */ - final override var speed: Double = 0.0 - private set - - /** - * This method is invoked when the resource will idle until the specified [deadline]. - */ - public abstract fun onIdle(deadline: Long) - - /** - * This method is invoked when the resource will be consumed until the specified [work] was processed or the - * [deadline] was reached. - */ - public abstract fun onConsume(work: Double, limit: Double, deadline: Long) - - /** - * This method is invoked when the resource consumer has finished. - */ - public abstract fun onFinish() - - /** - * Get the remaining work to process after a resource consumption. - * - * @param work The size of the resource consumption. - * @param speed The speed of consumption. - * @param duration The duration from the start of the consumption until now. - * @return The amount of work remaining. - */ - protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - return if (duration > 0L) { - val processed = duration / 1000.0 * speed - max(0.0, work - processed) - } else { - 0.0 - } - } - - /** - * Start the consumer. - */ - public fun start() { - check(state == SimResourceState.Pending) { "Consumer is already started" } - - val now = clock.millis() - - state = SimResourceState.Active - isProcessing = true - latestFlush = now - - try { - consumer.onEvent(this, SimResourceEvent.Start) - activeCommand = interpret(consumer.onNext(this), now) - } catch (cause: Throwable) { - doFail(cause) - } finally { - isProcessing = false - } - } - - /** - * Immediately stop the consumer. - */ - public fun stop() { - try { - isProcessing = true - latestFlush = clock.millis() - - flush(isIntermediate = true) - doStop() - } finally { - isProcessing = false - } - } - - override fun flush(isIntermediate: Boolean) { - // Flush is no-op when the consumer is finished or not yet started - if (state != SimResourceState.Active) { - return - } - - val now = clock.millis() - - // Fast path: if the intermediate progress was already flushed at the current instant, we can skip it. - if (isIntermediate && latestFlush >= now) { - return - } - - try { - val activeCommand = activeCommand ?: return - val (timestamp, command) = activeCommand - - // Note: accessor is reliant on activeCommand being set - val remainingWork = remainingWork - - isProcessing = true - - val duration = now - timestamp - assert(duration >= 0) { "Flush in the past" } - - this.activeCommand = when (command) { - is SimResourceCommand.Idle -> { - // We should only continue processing the next command if: - // 1. The resource consumer reached its deadline. - // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Idle(command.deadline), now) - } - } - is SimResourceCommand.Consume -> { - // We should only continue processing the next command if: - // 1. The resource consumption was finished. - // 2. The resource capacity cannot satisfy the demand. - // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { - next(now) - } else { - interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) - } - } - SimResourceCommand.Exit -> - // Flush may not be called when the resource consumer has finished - throw IllegalStateException() - } - - // Flush remaining work cache - _remainingWorkFlush = Long.MIN_VALUE - } catch (cause: Throwable) { - doFail(cause) - } finally { - latestFlush = now - isProcessing = false - } - } - - override fun interrupt() { - // Prevent users from interrupting the resource while they are constructing their next command, as this will - // only lead to infinite recursion. - if (isProcessing) { - return - } - - scheduler.schedule(this, isIntermediate = false) - } - - override fun toString(): String = "SimAbstractResourceContext[capacity=$capacity]" - - /** - * A flag to indicate that the resource is currently processing a command. - */ - private var isProcessing: Boolean = false - - /** - * The current command that is being processed. - */ - private var activeCommand: CommandWrapper? = null - - /** - * The latest timestamp at which the resource was flushed. - */ - private var latestFlush: Long = Long.MIN_VALUE - - /** - * Finish the consumer and resource provider. - */ - private fun doStop() { - val state = state - this.state = SimResourceState.Stopped - - if (state == SimResourceState.Active) { - activeCommand = null - try { - consumer.onEvent(this, SimResourceEvent.Exit) - onFinish() - } catch (cause: Throwable) { - doFail(cause) - } - } - } - - /** - * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. - */ - private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { - when (command) { - is SimResourceCommand.Idle -> { - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = 0.0 - - onIdle(deadline) - consumer.onEvent(this, SimResourceEvent.Run) - } - is SimResourceCommand.Consume -> { - val work = command.work - val limit = command.limit - val deadline = command.deadline - - require(deadline >= now) { "Deadline already passed" } - - speed = min(capacity, limit) - onConsume(work, limit, deadline) - consumer.onEvent(this, SimResourceEvent.Run) - } - is SimResourceCommand.Exit -> { - speed = 0.0 - - doStop() - - // No need to set the next active command - return null - } - } - - return CommandWrapper(now, command) - } - - /** - * Request the workload for more work. - */ - private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) - - /** - * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. - */ - private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { - val (timestamp, command) = wrapper - val duration = now - timestamp - return when (command) { - is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) - is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 - } - } - - /** - * Fail the resource consumer. - */ - private fun doFail(cause: Throwable) { - state = SimResourceState.Stopped - activeCommand = null - - try { - consumer.onFailure(this, cause) - } catch (e: Throwable) { - e.addSuppressed(cause) - e.printStackTrace() - } - - onFinish() - } - - /** - * Indicate that the capacity of the resource has changed. - */ - private fun onCapacityChange() { - // Do not inform the consumer if it has not been started yet - if (state != SimResourceState.Active) { - return - } - - val isThrottled = speed > capacity - - consumer.onEvent(this, SimResourceEvent.Capacity) - - // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. - // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). - if (isThrottled) { - flush(isIntermediate = true) - } - } - - /** - * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. - */ - private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt new file mode 100644 index 00000000..de26f99e --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + +/** + * Abstract implementation of the [SimResourceProvider] which can be re-used by other implementations. + */ +public abstract class SimAbstractResourceProvider( + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem?, + initialCapacity: Double +) : SimResourceProvider { + /** + * The capacity of the resource. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** + * The current processing speed of the resource. + */ + public override val speed: Double + get() = ctx?.speed ?: 0.0 + + /** + * The resource processing speed demand at this instant. + */ + public override val demand: Double + get() = ctx?.demand ?: 0.0 + + /** + * The resource counters to track the execution metrics of the resource. + */ + public override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + + /** + * The [SimResourceControllableContext] that is currently running. + */ + protected var ctx: SimResourceControllableContext? = null + private set + + /** + * The state of the resource provider. + */ + final override var state: SimResourceState = SimResourceState.Pending + private set + + /** + * Construct the [SimResourceProviderLogic] instance for a new consumer. + */ + protected abstract fun createLogic(): SimResourceProviderLogic + + /** + * Start the specified [SimResourceControllableContext]. + */ + protected open fun start(ctx: SimResourceControllableContext) { + ctx.start() + } + + /** + * Update the counters of the resource provider. + */ + protected fun updateCounters(ctx: SimResourceContext, work: Double) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += work + counters.actual += work - remainingWork + counters.overcommit += remainingWork + } + + final override fun startConsumer(consumer: SimResourceConsumer) { + check(state == SimResourceState.Pending) { "Resource is in invalid state" } + val ctx = interpreter.newContext(consumer, createLogic(), parent) + + ctx.capacity = capacity + this.ctx = ctx + this.state = SimResourceState.Active + + start(ctx) + } + + override fun close() { + cancel() + state = SimResourceState.Stopped + } + + final override fun interrupt() { + ctx?.interrupt() + } + + final override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.close() + } + + if (state != SimResourceState.Stopped) { + state = SimResourceState.Pending + } + } + + override fun toString(): String = "SimAbstractResourceProvider[capacity=$capacity]" +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt index bb4e6a2c..00972f43 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregator.kt @@ -25,24 +25,14 @@ package org.opendc.simulator.resources /** * A [SimResourceAggregator] aggregates the capacity of multiple resources into a single resource. */ -public interface SimResourceAggregator : AutoCloseable { - /** - * The output resource provider to which resource consumers can be attached. - */ - public val output: SimResourceProvider - +public interface SimResourceAggregator : SimResourceProvider { /** * The input resources that will be switched between the output providers. */ public val inputs: Set<SimResourceProvider> /** - * Add the specified [input] to the switch. + * Add the specified [input] to the aggregator. */ public fun addInput(input: SimResourceProvider) - - /** - * End the lifecycle of the aggregator. - */ - public override fun close() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 5665abd1..c39c1aca 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -25,7 +25,10 @@ package org.opendc.simulator.resources /** * A [SimResourceAggregator] that distributes the load equally across the input resources. */ -public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimAbstractResourceAggregator(scheduler) { +public class SimResourceAggregatorMaxMin( + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null +) : SimAbstractResourceAggregator(interpreter, parent) { private val consumers = mutableListOf<Input>() override fun doConsume(work: Double, limit: Double, deadline: Long) { @@ -35,7 +38,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA // Divide the requests over the available capacity of the input resources fairly for (input in consumers) { val inputCapacity = input.ctx.capacity - val fraction = inputCapacity / outputContext.capacity + val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction val grantedWork = fraction * work @@ -53,7 +56,7 @@ public class SimResourceAggregatorMaxMin(scheduler: SimResourceScheduler) : SimA } } - override fun doFinish(cause: Throwable?) { + override fun doFinish() { val iterator = consumers.iterator() for (input in iterator) { iterator.remove() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 7c76c634..0d9a6106 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -45,6 +45,11 @@ public interface SimResourceContext { public val speed: Double /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + + /** * The amount of work still remaining at this instant. */ public val remainingWork: Double diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt new file mode 100644 index 00000000..ceaca39a --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A controllable [SimResourceContext]. + * + * This interface is used by resource providers to control the resource context. + */ +public interface SimResourceControllableContext : SimResourceContext, AutoCloseable { + /** + * The state of the resource context. + */ + public val state: SimResourceState + + /** + * The capacity of the resource. + */ + public override var capacity: Double + + /** + * Start the resource context. + */ + public fun start() + + /** + * Stop the resource context. + */ + public override fun close() + + /** + * Invalidate the resource context's state. + * + * By invalidating the resource context's current state, the state is re-computed and the current progress is + * materialized during the next interpreter cycle. As a result, this call run asynchronously. See [flush] for the + * synchronous variant. + */ + public fun invalidate() + + /** + * Synchronously flush the progress of the resource context and materialize its current progress. + */ + public fun flush() +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt index f6a1a42e..725aa5bc 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceFlushable.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCounters.kt @@ -23,15 +23,26 @@ package org.opendc.simulator.resources /** - * An interface used by the [SimResourceScheduler] to flush the progress of resource consumer. + * An interface that tracks cumulative counts of the work performed by a resource. */ -public interface SimResourceFlushable { +public interface SimResourceCounters { /** - * Flush the current active resource consumption. - * - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. + * The amount of work that resource consumers wanted the resource to perform. */ - public fun flush(isIntermediate: Boolean) + public val demand: Double + + /** + * The amount of work performed by the resource. + */ + public val actual: Double + + /** + * The amount of work that could not be completed due to overcommitted resources. + */ + public val overcommit: Double + + /** + * Reset the resource counters. + */ + public fun reset() } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt index b2759b7f..e0333ff9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributor.kt @@ -25,19 +25,14 @@ package org.opendc.simulator.resources /** * A [SimResourceDistributor] distributes the capacity of some resource over multiple resource consumers. */ -public interface SimResourceDistributor : AutoCloseable { +public interface SimResourceDistributor : SimResourceConsumer { /** * The output resource providers to which resource consumers can be attached. */ public val outputs: Set<SimResourceProvider> /** - * The input resource that will be distributed over the consumers. + * Create a new output for the distributor. */ - public val input: SimResourceProvider - - /** - * Add an output to the switch with the specified [capacity]. - */ - public fun addOutput(capacity: Double): SimResourceProvider + public fun newOutput(): SimResourceProvider } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index a76cb1e3..be9e89fb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -29,23 +29,22 @@ import kotlin.math.min * A [SimResourceDistributor] that distributes the capacity of a resource over consumers using max-min fair sharing. */ public class SimResourceDistributorMaxMin( - override val input: SimResourceProvider, - private val scheduler: SimResourceScheduler, - private val listener: Listener? = null + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null ) : SimResourceDistributor { override val outputs: Set<SimResourceProvider> get() = _outputs - private val _outputs = mutableSetOf<OutputProvider>() + private val _outputs = mutableSetOf<Output>() /** - * The active output contexts. + * The resource context of the consumer. */ - private val outputContexts: MutableList<OutputContext> = mutableListOf() + private var ctx: SimResourceContext? = null /** - * The total speed requested by the output resources. + * The active outputs. */ - private var totalRequestedSpeed = 0.0 + private val activeOutputs: MutableList<Output> = mutableListOf() /** * The total amount of work requested by the output resources. @@ -57,147 +56,83 @@ public class SimResourceDistributorMaxMin( */ private var totalAllocatedSpeed = 0.0 - /** - * The total allocated work requested for the output resources. - */ - private var totalAllocatedWork = 0.0 - - /** - * The amount of work that could not be performed due to over-committing resources. - */ - private var totalOvercommittedWork = 0.0 - - /** - * The amount of work that was lost due to interference. - */ - private var totalInterferedWork = 0.0 - - /** - * A flag to indicate that the switch is closed. - */ - private var isClosed: Boolean = false - - /** - * An internal [SimResourceConsumer] implementation for switch inputs. - */ - private val consumer = object : SimResourceConsumer { - /** - * The resource context of the consumer. - */ - private lateinit var ctx: SimResourceContext - - val remainingWork: Double - get() = ctx.remainingWork - - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx.capacity) - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - } - SimResourceEvent.Exit -> { - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() - - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() - - output.close() - } - } - else -> {} - } - } - } - - /** - * The total amount of remaining work. - */ - private val totalRemainingWork: Double - get() = consumer.remainingWork - - override fun addOutput(capacity: Double): SimResourceProvider { - check(!isClosed) { "Distributor has been closed" } - - val provider = OutputProvider(capacity) + /* SimResourceDistributor */ + override fun newOutput(): SimResourceProvider { + val provider = Output(ctx?.capacity ?: 0.0) _outputs.add(provider) return provider } - override fun close() { - if (!isClosed) { - isClosed = true - input.cancel() - } + /* SimResourceConsumer */ + override fun onNext(ctx: SimResourceContext): SimResourceCommand { + return doNext(ctx.capacity) } - init { - input.startConsumer(consumer) - } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + updateCapacity(ctx) + } + SimResourceEvent.Exit -> { + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() - /** - * Indicate that the workloads should be re-scheduled. - */ - private fun schedule() { - input.interrupt() + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() + + output.close() + } + } + SimResourceEvent.Capacity -> updateCapacity(ctx) + else -> {} + } } /** - * Schedule the work over the physical CPUs. + * Schedule the work of the outputs. */ - private fun doSchedule(capacity: Double): SimResourceCommand { - // If there is no work yet, mark all inputs as idle. - if (outputContexts.isEmpty()) { + private fun doNext(capacity: Double): SimResourceCommand { + // If there is no work yet, mark the input as idle. + if (activeOutputs.isEmpty()) { return SimResourceCommand.Idle() } - val maxUsage = capacity var duration: Double = Double.MAX_VALUE var deadline: Long = Long.MAX_VALUE - var availableSpeed = maxUsage + var availableSpeed = capacity var totalRequestedSpeed = 0.0 var totalRequestedWork = 0.0 - // Flush the work of the outputs - var outputIterator = outputContexts.listIterator() - while (outputIterator.hasNext()) { - val output = outputIterator.next() - - output.flush(isIntermediate = true) + // Pull in the work of the outputs + val outputIterator = activeOutputs.listIterator() + for (output in outputIterator) { + output.pull() - if (output.activeCommand == SimResourceCommand.Exit) { - // Apparently the output consumer has exited, so remove it from the scheduling queue. + // Remove outputs that have finished + if (output.isFinished) { outputIterator.remove() } } - // Sort the outputs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - outputContexts.sort() + // Sort in-place the outputs based on their requested usage. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeOutputs.sort() // Divide the available input capacity fairly across the outputs using max-min fair sharing - outputIterator = outputContexts.listIterator() - var remaining = outputContexts.size - while (outputIterator.hasNext()) { - val output = outputIterator.next() + var remaining = activeOutputs.size + for (output in activeOutputs) { val availableShare = availableSpeed / remaining-- when (val command = output.activeCommand) { is SimResourceCommand.Idle -> { - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) - output.actualSpeed = 0.0 } is SimResourceCommand.Consume -> { val grantedSpeed = min(output.allowedSpeed, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue deadline = min(deadline, command.deadline) // Ignore idle computation @@ -212,216 +147,139 @@ public class SimResourceDistributorMaxMin( output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed - // The duration that we want to run is that of the shortest request from an output + // The duration that we want to run is that of the shortest request of an output duration = min(duration, command.work / grantedSpeed) } SimResourceCommand.Exit -> assert(false) { "Did not expect output to be stopped" } } } - assert(deadline >= scheduler.clock.millis()) { "Deadline already passed" } + assert(deadline >= interpreter.clock.millis()) { "Deadline already passed" } - this.totalRequestedSpeed = totalRequestedSpeed this.totalRequestedWork = totalRequestedWork - this.totalAllocatedSpeed = maxUsage - availableSpeed - this.totalAllocatedWork = min(totalRequestedWork, totalAllocatedSpeed * duration) + this.totalAllocatedSpeed = capacity - availableSpeed + val totalAllocatedWork = min( + totalRequestedWork, + totalAllocatedSpeed * min((deadline - interpreter.clock.millis()) / 1000.0, duration) + ) return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalRequestedWork, totalAllocatedSpeed, deadline) else SimResourceCommand.Idle(deadline) } - /** - * Obtain the next command to perform. - */ - private fun doNext(capacity: Double): SimResourceCommand { - val totalRequestedWork = totalRequestedWork.toLong() - val totalRemainingWork = totalRemainingWork.toLong() - val totalAllocatedWork = totalAllocatedWork.toLong() - val totalRequestedSpeed = totalRequestedSpeed - val totalAllocatedSpeed = totalAllocatedSpeed - - // Force all inputs to re-schedule their work. - val command = doSchedule(capacity) - - // Report metrics - listener?.onSliceFinish( - this, - totalRequestedWork, - totalAllocatedWork - totalRemainingWork, - totalOvercommittedWork.toLong(), - totalInterferedWork.toLong(), - totalAllocatedSpeed, - totalRequestedSpeed - ) - - totalInterferedWork = 0.0 - totalOvercommittedWork = 0.0 - - return command + private fun updateCapacity(ctx: SimResourceContext) { + for (output in _outputs) { + output.capacity = ctx.capacity + } } /** - * Event listener for hypervisor events. + * An internal [SimResourceProvider] implementation for switch outputs. */ - public interface Listener { + private inner class Output(capacity: Double) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceProviderLogic, Comparable<Output> { /** - * This method is invoked when a slice is finished. + * The current command that is processed by the resource. */ - public fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } + var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - /** - * An internal [SimResourceProvider] implementation for switch outputs. - */ - private inner class OutputProvider(val capacity: Double) : SimResourceProvider { /** - * The [OutputContext] that is currently running. + * The processing speed that is allowed by the model constraints. */ - private var ctx: OutputContext? = null + var allowedSpeed: Double = 0.0 - override var state: SimResourceState = SimResourceState.Pending - internal set + /** + * The actual processing speed. + */ + var actualSpeed: Double = 0.0 - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource cannot be consumed" } + /** + * A flag to indicate that the output is finished. + */ + val isFinished + get() = activeCommand is SimResourceCommand.Exit - val ctx = OutputContext(this, consumer) - this.ctx = ctx - this.state = SimResourceState.Active - outputContexts += ctx + /** + * The timestamp at which we received the last command. + */ + private var lastCommandTimestamp: Long = Long.MIN_VALUE - ctx.start() - schedule() - } + /* SimAbstractResourceProvider */ + override fun createLogic(): SimResourceProviderLogic = this - override fun close() { - cancel() + override fun start(ctx: SimResourceControllableContext) { + activeOutputs += this - if (state != SimResourceState.Stopped) { - state = SimResourceState.Stopped - _outputs.remove(this) + interpreter.batch { + ctx.start() + // Interrupt the input to re-schedule the resources + this@SimResourceDistributorMaxMin.ctx?.interrupt() } } - override fun interrupt() { - ctx?.interrupt() - } + override fun close() { + val state = state - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } + super.close() if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending + _outputs.remove(this) } } - } - - /** - * A [SimAbstractResourceContext] for the output resources. - */ - private inner class OutputContext( - private val provider: OutputProvider, - consumer: SimResourceConsumer - ) : SimAbstractResourceContext(provider.capacity, scheduler, consumer), Comparable<OutputContext> { - /** - * The current command that is processed by the vCPU. - */ - var activeCommand: SimResourceCommand = SimResourceCommand.Idle() - - /** - * The processing speed that is allowed by the model constraints. - */ - var allowedSpeed: Double = 0.0 - - /** - * The actual processing speed. - */ - var actualSpeed: Double = 0.0 - - private fun reportOvercommit() { - val remainingWork = remainingWork - totalOvercommittedWork += remainingWork - } - - override fun onIdle(deadline: Long) { - reportOvercommit() + /* SimResourceProviderLogic */ + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) - } + lastCommandTimestamp = ctx.clock.millis() - override fun onConsume(work: Double, limit: Double, deadline: Long) { - reportOvercommit() + return Long.MAX_VALUE + } - allowedSpeed = speed + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + allowedSpeed = ctx.speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) + lastCommandTimestamp = ctx.clock.millis() + + return Long.MAX_VALUE } - override fun onFinish() { - reportOvercommit() + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } + override fun onFinish(ctx: SimResourceControllableContext) { activeCommand = SimResourceCommand.Exit - provider.cancel() + lastCommandTimestamp = ctx.clock.millis() } - override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { - // Apply performance interference model - val performanceScore = 1.0 + override fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - // Compute the remaining amount of work return if (work > 0.0) { - // Compute the fraction of compute time allocated to the VM + // Compute the fraction of compute time allocated to the output val fraction = actualSpeed / totalAllocatedSpeed - // Compute the work that was actually granted to the VM. - val processingAvailable = max(0.0, totalAllocatedWork - totalRemainingWork) * fraction - val processed = processingAvailable * performanceScore - - val interferedWork = processingAvailable - processed - - totalInterferedWork += interferedWork - - max(0.0, work - processed) + // Compute the work that was actually granted to the output. + val processingAvailable = max(0.0, totalRequestedWork - totalRemainingWork) * fraction + max(0.0, work - processingAvailable) } else { 0.0 } } - private var isProcessing: Boolean = false - - override fun interrupt() { - // Prevent users from interrupting the CPU while it is constructing its next command, this will only lead - // to infinite recursion. - if (isProcessing) { - return - } - - try { - isProcessing = false + /* Comparable */ + override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed) - super.interrupt() - - // Force the scheduler to re-schedule - schedule() - } finally { - isProcessing = true + /** + * Pull the next command if necessary. + */ + fun pull() { + val ctx = ctx + if (ctx != null && lastCommandTimestamp < ctx.clock.millis()) { + ctx.flush() } } - - override fun compareTo(other: OutputContext): Int = allowedSpeed.compareTo(other.allowedSpeed) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt new file mode 100644 index 00000000..82631377 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The resource interpreter is responsible for managing the interaction between resource consumer and provider. + * + * The interpreter centralizes the scheduling logic of state updates of resource context, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public interface SimResourceInterpreter { + /** + * The [Clock] associated with this interpreter. + */ + public val clock: Clock + + /** + * Create a new [SimResourceControllableContext] with the given [provider]. + * + * @param consumer The consumer logic. + * @param provider The logic of the resource provider. + * @param parent The system to which the resource context belongs. + */ + public fun newContext( + consumer: SimResourceConsumer, + provider: SimResourceProviderLogic, + parent: SimResourceSystem? = null + ): SimResourceControllableContext + + /** + * Start batching the execution of resource updates until [popBatch] is called. + * + * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs + * simultaneously) in a single state update. + * + * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the + * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called + * the same amount of times. To simplify batching, see [batch]. + */ + public fun pushBatch() + + /** + * Stop the batching of resource updates and run the interpreter on the batch. + * + * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call. + */ + public fun popBatch() + + public companion object { + /** + * Construct a new [SimResourceInterpreter] implementation. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ + @JvmName("create") + public operator fun invoke(context: CoroutineContext, clock: Clock): SimResourceInterpreter { + return SimResourceInterpreterImpl(context, clock) + } + } +} + +/** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ +public inline fun SimResourceInterpreter.batch(block: () -> Unit) { + try { + pushBatch() + block() + } finally { + popBatch() + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt index 2f567a5e..f709ca17 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProvider.kt @@ -27,7 +27,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException /** - * A [SimResourceProvider] provides some resource of type [R]. + * A [SimResourceProvider] provides a resource that can be consumed by a [SimResourceConsumer]. */ public interface SimResourceProvider : AutoCloseable { /** @@ -36,6 +36,26 @@ public interface SimResourceProvider : AutoCloseable { public val state: SimResourceState /** + * The resource capacity available at this instant. + */ + public val capacity: Double + + /** + * The current processing speed of the resource. + */ + public val speed: Double + + /** + * The resource processing speed demand at this instant. + */ + public val demand: Double + + /** + * The resource counters to track the execution metrics of the resource. + */ + public val counters: SimResourceCounters + + /** * Start the specified [resource consumer][consumer] in the context of this resource provider asynchronously. * * @throws IllegalStateException if there is already a consumer active or the resource lifetime has ended. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt new file mode 100644 index 00000000..5231ecf5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +import kotlin.math.max + +/** + * The logic of a resource provider. + */ +public interface SimResourceProviderLogic { + /** + * This method is invoked when the resource is reported to idle until the specified [deadline]. + * + * @param ctx The context in which the provider runs. + * @param deadline The deadline that was requested by the resource consumer. + * @return The instant at which to resume the consumer. + */ + public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long + + /** + * This method is invoked when the resource will be consumed until the specified amount of [work] was processed + * or [deadline] is reached. + * + * @param ctx The context in which the provider runs. + * @param work The amount of work that was requested by the resource consumer. + * @param limit The limit on the work rate of the resource consumer. + * @param deadline The deadline that was requested by the resource consumer. + * @return The instant at which to resume the consumer. + */ + public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long + + /** + * This method is invoked when the progress of the resource consumer is materialized. + * + * @param ctx The context in which the provider runs. + * @param work The amount of work that was requested by the resource consumer. + */ + public fun onUpdate(ctx: SimResourceControllableContext, work: Double) {} + + /** + * This method is invoked when the resource consumer has finished. + */ + public fun onFinish(ctx: SimResourceControllableContext) + + /** + * Get the remaining work to process after a resource consumption. + * + * @param work The size of the resource consumption. + * @param speed The speed of consumption. + * @param duration The duration from the start of the consumption until now. + * @return The amount of work remaining. + */ + public fun getRemainingWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { + return if (duration > 0L) { + val processed = duration / 1000.0 * speed + max(0.0, work - processed) + } else { + 0.0 + } + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt deleted file mode 100644 index a228c47b..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceScheduler.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import java.time.Clock - -/** - * A resource scheduler is responsible for scheduling the communication and synchronization between multiple resource - * providers and consumers. - * - * By centralizing the scheduling logic, updates of resources within a single system can be scheduled and tracked more - * efficiently, reducing the overall work needed per update. - */ -public interface SimResourceScheduler { - /** - * The [Clock] associated with this scheduler. - */ - public val clock: Clock - - /** - * Schedule a direct interrupt for the resource context represented by [flushable]. - * - * @param flushable The resource context that needs to be flushed. - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean = false) - - /** - * Schedule an interrupt in the future for the resource context represented by [flushable]. - * - * This method will override earlier calls to this method for the same [flushable]. - * - * @param flushable The resource context that needs to be flushed. - * @param timestamp The timestamp when the interrupt should happen. - * @param isIntermediate A flag to indicate that the intermediate progress of the resource consumer should be - * flushed, but without interrupting the resource consumer to submit a new command. If false, the resource consumer - * will be asked to deliver a new command and is essentially interrupted. - */ - public fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean = false) - - /** - * Batch the execution of several interrupts into a single call. - * - * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. - */ - public fun batch(block: () -> Unit) -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt deleted file mode 100644 index cdbb4a6c..00000000 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSchedulerTrampoline.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.resources - -import org.opendc.utils.TimerScheduler -import java.time.Clock -import java.util.ArrayDeque -import kotlin.coroutines.CoroutineContext - -/** - * A [SimResourceScheduler] queues all interrupts that occur during execution to be executed after. - * - * @param clock The virtual simulation clock. - */ -public class SimResourceSchedulerTrampoline(context: CoroutineContext, override val clock: Clock) : SimResourceScheduler { - /** - * The [TimerScheduler] to actually schedule the interrupts. - */ - private val timers = TimerScheduler<Any>(context, clock) - - /** - * A flag to indicate that an interrupt is currently running already. - */ - private var isRunning: Boolean = false - - /** - * The queue of resources to be flushed. - */ - private val queue = ArrayDeque<Pair<SimResourceFlushable, Boolean>>() - - override fun schedule(flushable: SimResourceFlushable, isIntermediate: Boolean) { - queue.add(flushable to isIntermediate) - - if (isRunning) { - return - } - - flush() - } - - override fun schedule(flushable: SimResourceFlushable, timestamp: Long, isIntermediate: Boolean) { - timers.startSingleTimerTo(flushable, timestamp) { - schedule(flushable, isIntermediate) - } - } - - override fun batch(block: () -> Unit) { - val wasAlreadyRunning = isRunning - try { - isRunning = true - block() - } finally { - if (!wasAlreadyRunning) { - isRunning = false - } - } - } - - /** - * Flush the scheduled queue. - */ - private fun flush() { - val visited = mutableSetOf<SimResourceFlushable>() - try { - isRunning = true - while (queue.isNotEmpty()) { - val (flushable, isIntermediate) = queue.poll() - flushable.flush(isIntermediate) - visited.add(flushable) - } - } finally { - isRunning = false - } - } -} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3277b889..9f062cc3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,98 +26,39 @@ import kotlin.math.ceil import kotlin.math.min /** - * A [SimResourceSource] represents a source for some resource of type [R] that provides bounded processing capacity. + * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity. * * @param initialCapacity The initial capacity of the resource. - * @param scheduler The scheduler to schedule the interrupts. + * @param interpreter The interpreter that is used for managing the resource contexts. + * @param parent The parent resource system. */ public class SimResourceSource( initialCapacity: Double, - private val scheduler: SimResourceScheduler -) : SimResourceProvider { - /** - * The current processing speed of the resource. - */ - public val speed: Double - get() = ctx?.speed ?: 0.0 - - /** - * The capacity of the resource. - */ - public var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - - /** - * The [Context] that is currently running. - */ - private var ctx: Context? = null - - override var state: SimResourceState = SimResourceState.Pending - private set - - override fun startConsumer(consumer: SimResourceConsumer) { - check(state == SimResourceState.Pending) { "Resource is in invalid state" } - val ctx = Context(consumer) - - this.ctx = ctx - this.state = SimResourceState.Active - - ctx.start() - } - - override fun close() { - cancel() - state = SimResourceState.Stopped - } - - override fun interrupt() { - ctx?.interrupt() - } - - override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.stop() - } - - if (state != SimResourceState.Stopped) { - state = SimResourceState.Pending - } - } - - /** - * Internal implementation of [SimResourceContext] for this class. - */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, scheduler, consumer) { - override fun onIdle(deadline: Long) { - // Do not resume if deadline is "infinite" - if (deadline != Long.MAX_VALUE) { - scheduler.schedule(this, deadline) + private val interpreter: SimResourceInterpreter, + private val parent: SimResourceSystem? = null +) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { + override fun createLogic(): SimResourceProviderLogic { + return object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { + return deadline } - } - - override fun onConsume(work: Double, limit: Double, deadline: Long) { - val until = min(deadline, clock.millis() + getDuration(work, speed)) - scheduler.schedule(this, until) - } - override fun onFinish() { - cancel() + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + return min(deadline, ctx.clock.millis() + getDuration(work, speed)) + } - ctx = null + override fun onUpdate(ctx: SimResourceControllableContext, work: Double) { + updateCounters(ctx, work) + } - if (this@SimResourceSource.state != SimResourceState.Stopped) { - this@SimResourceSource.state = SimResourceState.Pending + override fun onFinish(ctx: SimResourceControllableContext) { + cancel() } } - - override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + override fun toString(): String = "SimResourceSource[capacity=$capacity]" + /** * Compute the duration that a resource consumption will take with the specified [speed]. */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt index 53fec16a..e224285e 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitch.kt @@ -37,9 +37,14 @@ public interface SimResourceSwitch : AutoCloseable { public val inputs: Set<SimResourceProvider> /** - * Add an output to the switch with the specified [capacity]. + * The resource counters to track the execution metrics of all switch resources. */ - public fun addOutput(capacity: Double): SimResourceProvider + public val counters: SimResourceCounters + + /** + * Create a new output on the switch. + */ + public fun newOutput(): SimResourceProvider /** * Add the specified [input] to the switch. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 1a9dd0bc..2950af80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -44,11 +44,28 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { override val inputs: Set<SimResourceProvider> get() = _inputs - override fun addOutput(capacity: Double): SimResourceProvider { + override val counters: SimResourceCounters = object : SimResourceCounters { + override val demand: Double + get() = _inputs.sumOf { it.counters.demand } + override val actual: Double + get() = _inputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _inputs.sumOf { it.counters.overcommit } + + override fun reset() { + for (input in _inputs) { + input.counters.reset() + } + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } check(availableResources.isNotEmpty()) { "No capacity to serve request" } val forwarder = availableResources.poll() - val output = Provider(capacity, forwarder) + val output = Provider(forwarder) _outputs += output return output } @@ -84,13 +101,9 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { _inputs.forEach(SimResourceProvider::cancel) } - private inner class Provider( - private val capacity: Double, - private val forwarder: SimResourceTransformer - ) : SimResourceProvider by forwarder { + private inner class Provider(private val forwarder: SimResourceTransformer) : SimResourceProvider by forwarder { override fun close() { // We explicitly do not close the forwarder here in order to re-use it across output resources. - _outputs -= this availableResources += forwarder } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 5dc1e68d..684a1b52 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -22,23 +22,31 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.* - /** * A [SimResourceSwitch] implementation that switches resource consumptions over the available resources using max-min * fair sharing. */ public class SimResourceSwitchMaxMin( - scheduler: SimResourceScheduler, - private val listener: Listener? = null + interpreter: SimResourceInterpreter, + parent: SimResourceSystem? = null ) : SimResourceSwitch { - private val _outputs = mutableSetOf<SimResourceProvider>() + /** + * The output resource providers to which resource consumers can be attached. + */ override val outputs: Set<SimResourceProvider> - get() = _outputs + get() = distributor.outputs - private val _inputs = mutableSetOf<SimResourceProvider>() + /** + * The input resources that will be switched between the output providers. + */ override val inputs: Set<SimResourceProvider> - get() = _inputs + get() = aggregator.inputs + + /** + * The resource counters to track the execution metrics of all switch resources. + */ + override val counters: SimResourceCounters + get() = aggregator.counters /** * A flag to indicate that the switch was closed. @@ -48,37 +56,24 @@ public class SimResourceSwitchMaxMin( /** * The aggregator to aggregate the resources. */ - private val aggregator = SimResourceAggregatorMaxMin(scheduler) + private val aggregator = SimResourceAggregatorMaxMin(interpreter, parent) /** * The distributor to distribute the aggregated resources. */ - private val distributor = SimResourceDistributorMaxMin( - aggregator.output, scheduler, - object : SimResourceDistributorMaxMin.Listener { - override fun onSliceFinish( - switch: SimResourceDistributor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - listener?.onSliceFinish(this@SimResourceSwitchMaxMin, requestedWork, grantedWork, overcommittedWork, interferedWork, cpuUsage, cpuDemand) - } - } - ) + private val distributor = SimResourceDistributorMaxMin(interpreter, parent) + + init { + aggregator.startConsumer(distributor) + } /** - * Add an output to the switch represented by [resource]. + * Add an output to the switch. */ - override fun addOutput(capacity: Double): SimResourceProvider { + override fun newOutput(): SimResourceProvider { check(!isClosed) { "Switch has been closed" } - val provider = distributor.addOutput(capacity) - _outputs.add(provider) - return provider + return distributor.newOutput() } /** @@ -93,26 +88,7 @@ public class SimResourceSwitchMaxMin( override fun close() { if (!isClosed) { isClosed = true - distributor.close() aggregator.close() } } - - /** - * Event listener for hypervisor events. - */ - public interface Listener { - /** - * This method is invoked when a slice is finished. - */ - public fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) - } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt new file mode 100644 index 00000000..609262cb --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSystem.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources + +/** + * A system of possible multiple sub-resources. + * + * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the + * resource provider. + */ +public interface SimResourceSystem { + /** + * The parent system to which this system belongs or `null` if it has no parent. + */ + public val parent: SimResourceSystem? + + /** + * This method is invoked when the system has converged to a steady-state. + * + * @param timestamp The timestamp at which the system converged. + */ + public fun onConverge(timestamp: Long) +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index 32f3f573..fd3d1230 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.resources +import org.opendc.simulator.resources.impl.SimResourceCountersImpl + /** * A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider. * @@ -53,6 +55,19 @@ public class SimResourceTransformer( override var state: SimResourceState = SimResourceState.Pending private set + override val capacity: Double + get() = ctx?.capacity ?: 0.0 + + override val speed: Double + get() = ctx?.speed ?: 0.0 + + override val demand: Double + get() = ctx?.demand ?: 0.0 + + override val counters: SimResourceCounters + get() = _counters + private val _counters = SimResourceCountersImpl() + override fun startConsumer(consumer: SimResourceConsumer) { check(state == SimResourceState.Pending) { "Resource is in invalid state" } @@ -97,10 +112,15 @@ public class SimResourceTransformer( start() } + updateCounters(ctx) + return if (state == SimResourceState.Stopped) { SimResourceCommand.Exit } else if (delegate != null) { val command = transform(ctx, delegate.onNext(ctx)) + + _work = if (command is SimResourceCommand.Consume) command.work else 0.0 + if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards @@ -169,6 +189,22 @@ public class SimResourceTransformer( state = SimResourceState.Pending } } + + /** + * Counter to track the current submitted work. + */ + private var _work = 0.0 + + /** + * Update the resource counters for the transformer. + */ + private fun updateCounters(ctx: SimResourceContext) { + val counters = _counters + val remainingWork = ctx.remainingWork + counters.demand += _work + counters.actual += _work - remainingWork + counters.overcommit += remainingWork + } } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt new file mode 100644 index 00000000..46c5c63f --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -0,0 +1,422 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import org.opendc.simulator.resources.* +import java.time.Clock +import kotlin.math.min + +/** + * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers. + */ +internal class SimResourceContextImpl( + override val parent: SimResourceSystem?, + private val interpreter: SimResourceInterpreterImpl, + private val consumer: SimResourceConsumer, + private val logic: SimResourceProviderLogic +) : SimResourceControllableContext, SimResourceSystem { + /** + * The clock of the context. + */ + override val clock: Clock + get() = interpreter.clock + + /** + * The capacity of the resource. + */ + override var capacity: Double = 0.0 + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** + * The amount of work still remaining at this instant. + */ + override val remainingWork: Double + get() { + val now = clock.millis() + + return if (_remainingWorkFlush < now) { + _remainingWorkFlush = now + computeRemainingWork(now).also { _remainingWork = it } + } else { + _remainingWork + } + } + private var _remainingWork: Double = 0.0 + private var _remainingWorkFlush: Long = Long.MIN_VALUE + + /** + * A flag to indicate the state of the context. + */ + override val state: SimResourceState + get() = _state + private var _state = SimResourceState.Pending + + /** + * The current processing speed of the resource. + */ + override val speed: Double + get() = _speed + private var _speed = 0.0 + + /** + * The current resource processing demand. + */ + override val demand: Double + get() = _limit + + private val counters = object : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + /** + * The current state of the resource context. + */ + private var _timestamp: Long = Long.MIN_VALUE + private var _work: Double = 0.0 + private var _limit: Double = 0.0 + private var _deadline: Long = Long.MAX_VALUE + + /** + * The update flag indicating why the update was triggered. + */ + private var _flag: Flag = Flag.None + + /** + * The current pending update. + */ + private var _pendingUpdate: SimResourceInterpreterImpl.Update? = null + + override fun start() { + check(_state == SimResourceState.Pending) { "Consumer is already started" } + interpreter.batch { + consumer.onEvent(this, SimResourceEvent.Start) + _state = SimResourceState.Active + interrupt() + } + } + + override fun close() { + if (_state != SimResourceState.Stopped) { + interpreter.batch { + _state = SimResourceState.Stopped + doStop() + } + } + } + + override fun interrupt() { + if (_state == SimResourceState.Stopped) { + return + } + + enableFlag(Flag.Interrupt) + scheduleUpdate() + } + + override fun invalidate() { + if (_state == SimResourceState.Stopped) { + return + } + + enableFlag(Flag.Invalidate) + scheduleUpdate() + } + + override fun flush() { + if (_state == SimResourceState.Stopped) { + return + } + + interpreter.scheduleSync(this) + } + + /** + * Determine whether the state of the resource context should be updated. + */ + fun requiresUpdate(timestamp: Long): Boolean { + // Either the resource context is flagged or there is a pending update at this timestamp + return _flag != Flag.None || _pendingUpdate?.timestamp == timestamp + } + + /** + * Update the state of the resource context. + */ + fun doUpdate(timestamp: Long) { + try { + val oldState = _state + val newState = doUpdate(timestamp, oldState) + + _state = newState + _flag = Flag.None + + when (newState) { + SimResourceState.Pending -> + if (oldState != SimResourceState.Pending) { + throw IllegalStateException("Illegal transition to pending state") + } + SimResourceState.Stopped -> + if (oldState != SimResourceState.Stopped) { + doStop() + } + else -> {} + } + } catch (cause: Throwable) { + doFail(cause) + } finally { + _remainingWorkFlush = Long.MIN_VALUE + _timestamp = timestamp + } + } + + override fun onConverge(timestamp: Long) { + if (_state == SimResourceState.Active) { + consumer.onEvent(this, SimResourceEvent.Run) + } + } + + override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]" + + /** + * Update the state of the resource context. + */ + private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState { + return when (state) { + // Resource context is not active, so its state will not update + SimResourceState.Pending, SimResourceState.Stopped -> state + SimResourceState.Active -> { + val isInterrupted = _flag == Flag.Interrupt + val remainingWork = remainingWork + val isConsume = _limit > 0.0 + + // Update the resource counters only if there is some progress + if (timestamp > _timestamp) { + logic.onUpdate(this, _work) + } + + // We should only continue processing the next command if: + // 1. The resource consumption was finished. + // 2. The resource capacity cannot satisfy the demand. + // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + if ((isConsume && remainingWork == 0.0) || _deadline <= timestamp || isInterrupted) { + next(timestamp) + } else if (isConsume) { + interpret(SimResourceCommand.Consume(remainingWork, _limit, _deadline), timestamp) + } else { + interpret(SimResourceCommand.Idle(_deadline), timestamp) + } + } + } + } + + /** + * Stop the resource context. + */ + private fun doStop() { + try { + consumer.onEvent(this, SimResourceEvent.Exit) + logic.onFinish(this) + } catch (cause: Throwable) { + doFail(cause) + } + } + + /** + * Fail the resource consumer. + */ + private fun doFail(cause: Throwable) { + try { + consumer.onFailure(this, cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + e.printStackTrace() + } + + logic.onFinish(this) + } + + /** + * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. + */ + private fun interpret(command: SimResourceCommand, now: Long): SimResourceState { + return when (command) { + is SimResourceCommand.Idle -> { + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = deadline + + val timestamp = logic.onIdle(this, deadline) + scheduleUpdate(timestamp) + + SimResourceState.Active + } + is SimResourceCommand.Consume -> { + val work = command.work + val limit = command.limit + val deadline = command.deadline + + require(deadline >= now) { "Deadline already passed" } + + _speed = min(capacity, limit) + _work = work + _limit = limit + _deadline = deadline + + val timestamp = logic.onConsume(this, work, limit, deadline) + scheduleUpdate(timestamp) + + SimResourceState.Active + } + is SimResourceCommand.Exit -> { + _speed = 0.0 + _work = 0.0 + _limit = 0.0 + _deadline = Long.MAX_VALUE + + SimResourceState.Stopped + } + } + } + + /** + * Request the workload for more work. + */ + private fun next(now: Long): SimResourceState = interpret(consumer.onNext(this), now) + + /** + * Compute the remaining work based on the current state. + */ + private fun computeRemainingWork(now: Long): Double { + return if (_work > 0.0) + logic.getRemainingWork(this, _work, speed, now - _timestamp) + else 0.0 + } + + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + + interpreter.batch { + // Inform the consumer of the capacity change. This might already trigger an interrupt. + consumer.onEvent(this, SimResourceEvent.Capacity) + + // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command. + if (isThrottled) { + invalidate() + } + } + } + + /** + * Enable the specified [flag] taking into account precedence. + */ + private fun enableFlag(flag: Flag) { + _flag = when (_flag) { + Flag.None -> flag + Flag.Invalidate -> + when (flag) { + Flag.None -> flag + else -> flag + } + Flag.Interrupt -> + when (flag) { + Flag.None, Flag.Invalidate -> flag + else -> flag + } + } + } + + /** + * Schedule an update for this resource context. + */ + private fun scheduleUpdate() { + // Cancel the pending update + val pendingUpdate = _pendingUpdate + if (pendingUpdate != null) { + _pendingUpdate = null + pendingUpdate.cancel() + } + + interpreter.scheduleImmediate(this) + } + + /** + * Schedule a delayed update for this resource context. + */ + private fun scheduleUpdate(timestamp: Long) { + val pendingUpdate = _pendingUpdate + if (pendingUpdate != null) { + if (pendingUpdate.timestamp == timestamp) { + // Fast-path: A pending update for the same timestamp already exists + return + } else { + // Cancel the old pending update + _pendingUpdate = null + pendingUpdate.cancel() + } + } + + if (timestamp != Long.MAX_VALUE) { + _pendingUpdate = interpreter.scheduleDelayed(this, timestamp) + } + } + + /** + * An enumeration of flags that can be assigned to a resource context to indicate whether they are invalidated or + * interrupted. + */ + enum class Flag { + None, + Interrupt, + Invalidate + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt new file mode 100644 index 00000000..827019c5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceCountersImpl.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import org.opendc.simulator.resources.SimResourceCounters + +/** + * Mutable implementation of the [SimResourceCounters] interface. + */ +internal class SimResourceCountersImpl : SimResourceCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + } + + override fun toString(): String = "SimResourceCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" +} diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt new file mode 100644 index 00000000..cb0d6160 --- /dev/null +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -0,0 +1,331 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.resources.impl + +import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Runnable +import org.opendc.simulator.resources.* +import java.time.Clock +import java.util.* +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * A [SimResourceInterpreter] queues all interrupts that occur during execution to be executed after. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ +internal class SimResourceInterpreterImpl(private val context: CoroutineContext, override val clock: Clock) : SimResourceInterpreter { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The queue of resource updates that are scheduled for immediate execution. + */ + private val queue = ArrayDeque<Update>() + + /** + * A priority queue containing the resource updates to be scheduled in the future. + */ + private val futureQueue = PriorityQueue<Update>() + + /** + * The stack of interpreter invocations to occur in the future. + */ + private val futureInvocations = ArrayDeque<Invocation>() + + /** + * The systems that have been visited during the interpreter cycle. + */ + private val visited = linkedSetOf<SimResourceSystem>() + + /** + * The index in the batch stack. + */ + private var batchIndex = 0 + + /** + * A flag to indicate that the interpreter is currently active. + */ + private val isRunning: Boolean + get() = batchIndex > 0 + + /** + * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle. + * + * This method should be used when the state of a resource context is invalidated/interrupted and needs to be + * re-computed. In case no interpreter is currently active, the interpreter will be started. + */ + fun scheduleImmediate(ctx: SimResourceContextImpl) { + queue.add(Update(ctx, Long.MIN_VALUE)) + + // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked + // up by the active interpreter. + if (isRunning) { + return + } + + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + } + + /** + * Update the specified [ctx] synchronously. + */ + fun scheduleSync(ctx: SimResourceContextImpl) { + ctx.doUpdate(clock.millis()) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) + } + + // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked + // up by the active interpreter. + if (isRunning) { + return + } + + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + } + + /** + * Schedule the interpreter to run at [timestamp] to update the resource contexts. + * + * This method will override earlier calls to this method for the same [ctx]. + * + * @param ctx The resource context to which the event applies. + * @param timestamp The timestamp when the interrupt should happen. + */ + fun scheduleDelayed(ctx: SimResourceContextImpl, timestamp: Long): Update { + val now = clock.millis() + val futureQueue = futureQueue + + require(timestamp >= now) { "Timestamp must be in the future" } + + val update = Update(ctx, timestamp) + futureQueue.add(update) + + // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference. + if (futureQueue.peek() === update) { + trySchedule(futureQueue, futureInvocations) + } + + return update + } + + override fun newContext( + consumer: SimResourceConsumer, + provider: SimResourceProviderLogic, + parent: SimResourceSystem? + ): SimResourceControllableContext = SimResourceContextImpl(parent, this, consumer, provider) + + override fun pushBatch() { + batchIndex++ + } + + override fun popBatch() { + try { + // Flush the work if the platform is not already running + if (batchIndex == 1 && queue.isNotEmpty()) { + runInterpreter() + } + } finally { + batchIndex-- + } + } + + /** + * Interpret all actions that are scheduled for the current timestamp. + */ + private fun runInterpreter() { + val now = clock.millis() + val queue = queue + val futureQueue = futureQueue + val futureInvocations = futureInvocations + val visited = visited + + // Execute all scheduled updates at current timestamp + while (true) { + val update = futureQueue.peek() ?: break + + assert(update.timestamp >= now) { "Internal inconsistency: found update of the past" } + + if (update.timestamp > now && !update.isCancelled) { + // Schedule a task for the next event to occur. + trySchedule(futureQueue, futureInvocations) + break + } + + futureQueue.poll() + + if (update(now) && visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } + } + + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val update = queue.poll() ?: break + if (update(now) && visited.add(update.ctx)) { + collectAncestors(update.ctx, visited) + } + } + + for (system in visited) { + system.onConverge(now) + } + + visited.clear() + } while (queue.isNotEmpty()) + } + + /** + * Try to schedule the next interpreter event. + */ + private fun trySchedule(queue: PriorityQueue<Update>, scheduled: ArrayDeque<Invocation>) { + val nextTimer = queue.peek() + val now = clock.millis() + + // Check whether we need to update our schedule: + if (nextTimer == null) { + // Case 1: all timers are cancelled + for (invocation in scheduled) { + invocation.cancel() + } + scheduled.clear() + return + } + + while (true) { + val invocation = scheduled.peekFirst() + if (invocation == null || invocation.timestamp > nextTimer.timestamp) { + // Case 2: A new timer was registered ahead of the other timers. + // Solution: Schedule a new scheduler invocation + val nextTimestamp = nextTimer.timestamp + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout( + nextTimestamp - now, + { + try { + batchIndex++ + runInterpreter() + } finally { + batchIndex-- + } + }, + context + ) + scheduled.addFirst(Invocation(nextTimestamp, handle)) + break + } else if (invocation.timestamp < nextTimer.timestamp) { + // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted + // Solution: Cancel the next scheduler invocation + invocation.cancel() + scheduled.pollFirst() + } else { + break + } + } + } + + /** + * Collect all the ancestors of the specified [system]. + */ + private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) { + val parent = system.parent + if (parent != null) { + systems.add(parent) + collectAncestors(parent, systems) + } + } + + /** + * A future interpreter invocation. + * + * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private data class Invocation( + @JvmField val timestamp: Long, + @JvmField private val disposableHandle: DisposableHandle + ) { + /** + * Cancel the interpreter invocation. + */ + fun cancel() = disposableHandle.dispose() + } + + /** + * An update call for [ctx] that is scheduled for [timestamp]. + * + * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be + * cancelled if the resource context was invalidated in the meantime. + */ + class Update(@JvmField val ctx: SimResourceContextImpl, @JvmField val timestamp: Long) : Comparable<Update> { + /** + * A flag to indicate that the task has been cancelled. + */ + @JvmField + var isCancelled: Boolean = false + + /** + * Cancel the update. + */ + fun cancel() { + isCancelled = true + } + + /** + * Immediately run update. + */ + operator fun invoke(timestamp: Long): Boolean { + val shouldExecute = !isCancelled && ctx.requiresUpdate(timestamp) + if (shouldExecute) { + ctx.doUpdate(timestamp) + } + return shouldExecute + } + + override fun compareTo(other: Update): Int = timestamp.compareTo(other.timestamp) + + override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]" + } +} diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt index 2b32300e..51024e80 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt @@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceAggregatorMaxMin] class. @@ -41,7 +42,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimResourceAggregatorMaxMinTest { @Test fun testSingleCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val forwarder = SimResourceForwarder() @@ -58,7 +59,7 @@ internal class SimResourceAggregatorMaxMinTest { source.startConsumer(adapter) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertAll( @@ -66,13 +67,13 @@ internal class SimResourceAggregatorMaxMinTest { { assertEquals(listOf(0.0, 0.5, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testDoubleCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -86,20 +87,20 @@ internal class SimResourceAggregatorMaxMinTest { val adapter = SimSpeedConsumerAdapter(consumer, usage::add) try { - aggregator.output.consume(adapter) + aggregator.consume(adapter) yield() assertAll( { assertEquals(1000, clock.millis()) }, { assertEquals(listOf(0.0, 2.0, 0.0), usage) } ) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testOvercommit() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -114,19 +115,19 @@ internal class SimResourceAggregatorMaxMinTest { .andThen(SimResourceCommand.Exit) try { - aggregator.output.consume(consumer) + aggregator.consume(consumer) yield() assertEquals(1000, clock.millis()) verify(exactly = 2) { consumer.onNext(any()) } } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testException() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -141,17 +142,17 @@ internal class SimResourceAggregatorMaxMinTest { .andThenThrows(IllegalStateException("Test Exception")) try { - assertThrows<IllegalStateException> { aggregator.output.consume(consumer) } + assertThrows<IllegalStateException> { aggregator.consume(consumer) } yield() assertEquals(SimResourceState.Pending, sources[0].state) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -163,20 +164,20 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(4.0, 1.0) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(1000) sources[0].capacity = 0.5 } yield() assertEquals(2334, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() } } @Test fun testFailOverCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val aggregator = SimResourceAggregatorMaxMin(scheduler) val sources = listOf( @@ -188,14 +189,40 @@ internal class SimResourceAggregatorMaxMinTest { val consumer = SimWorkConsumer(1.0, 0.5) try { coroutineScope { - launch { aggregator.output.consume(consumer) } + launch { aggregator.consume(consumer) } delay(500) sources[0].capacity = 0.5 } yield() assertEquals(1000, clock.millis()) } finally { - aggregator.output.close() + aggregator.close() + } + } + + @Test + fun testCounters() = runBlockingSimulation { + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + + val aggregator = SimResourceAggregatorMaxMin(scheduler) + val sources = listOf( + SimResourceSource(1.0, scheduler), + SimResourceSource(1.0, scheduler) + ) + sources.forEach(aggregator::addInput) + + val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) + every { consumer.onNext(any()) } + .returns(SimResourceCommand.Consume(4.0, 4.0, 1000)) + .andThen(SimResourceCommand.Exit) + + try { + aggregator.consume(consumer) + yield() + assertEquals(1000, clock.millis()) + assertEquals(2.0, aggregator.counters.actual) { "Actual work mismatch" } + } finally { + aggregator.close() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 2e2d6588..6cb507ce 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -26,98 +26,109 @@ import io.mockk.* import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.resources.impl.SimResourceContextImpl +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** - * A test suite for the [SimAbstractResourceContext] class. + * A test suite for the [SimResourceContextImpl] class. */ @OptIn(ExperimentalCoroutinesApi::class) class SimResourceContextTest { @Test fun testFlushWithoutCommand() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) - context.flush(isIntermediate = false) + context.doUpdate(interpreter.clock.millis()) } @Test fun testIntermediateFlush() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline }) + val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) context.start() delay(1) // Delay 1 ms to prevent hitting the fast path - context.flush(isIntermediate = true) + context.doUpdate(interpreter.clock.millis()) - verify(exactly = 2) { context.onConsume(any(), any(), any()) } + verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } } @Test fun testIntermediateFlushIdle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = spyk(object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline }) + val context = spyk(SimResourceContextImpl(null, interpreter, consumer, logic)) context.start() delay(5) - context.flush(isIntermediate = true) + context.invalidate() delay(5) - context.flush(isIntermediate = true) + context.invalidate() assertAll( - { verify(exactly = 2) { context.onIdle(any()) } }, - { verify(exactly = 1) { context.onFinish() } } + { verify(exactly = 2) { logic.onIdle(any(), any()) } }, + { verify(exactly = 1) { logic.onFinish(any()) } } ) } @Test fun testDoubleStart() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Idle(10) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onFinish() {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() - assertThrows<IllegalStateException> { context.start() } + + assertThrows<IllegalStateException> { + context.start() + } } @Test fun testIdempodentCapacityChange() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(10.0, 1.0) andThen SimResourceCommand.Exit - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} + val logic = object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline } + val context = SimResourceContextImpl(null, interpreter, consumer, logic) + context.capacity = 4200.0 context.start() context.capacity = 4200.0 @@ -126,17 +137,19 @@ class SimResourceContextTest { @Test fun testFailureNoInfiniteLoop() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Exit every { consumer.onEvent(any(), SimResourceEvent.Exit) } throws IllegalStateException("onEvent") every { consumer.onFailure(any(), any()) } throws IllegalStateException("onFailure") - val context = object : SimAbstractResourceContext(4200.0, scheduler, consumer) { - override fun onIdle(deadline: Long) {} - override fun onConsume(work: Double, limit: Double, deadline: Long) {} - override fun onFinish() {} - } + val logic = spyk(object : SimResourceProviderLogic { + override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long = deadline + override fun onFinish(ctx: SimResourceControllableContext) {} + override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long = deadline + }) + + val context = SimResourceContextImpl(null, interpreter, consumer, logic) context.start() diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt index 5e86088d..08d88093 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSourceTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceSource] class. @@ -40,7 +41,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer class SimResourceSourceTest { @Test fun testSpeed() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -63,7 +64,7 @@ class SimResourceSourceTest { @Test fun testAdjustCapacity() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -83,7 +84,7 @@ class SimResourceSourceTest { @Test fun testSpeedLimit() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -110,7 +111,7 @@ class SimResourceSourceTest { */ @Test fun testIntermediateInterrupt() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -133,7 +134,7 @@ class SimResourceSourceTest { @Test fun testInterrupt() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) lateinit var resCtx: SimResourceContext @@ -174,7 +175,7 @@ class SimResourceSourceTest { @Test fun testFailure() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -193,7 +194,7 @@ class SimResourceSourceTest { @Test fun testExceptionPropagationOnNext() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -213,7 +214,7 @@ class SimResourceSourceTest { @Test fun testConcurrentConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -236,7 +237,7 @@ class SimResourceSourceTest { @Test fun testClosedConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -257,7 +258,7 @@ class SimResourceSourceTest { @Test fun testCloseDuringConsumption() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -279,7 +280,7 @@ class SimResourceSourceTest { @Test fun testIdle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -301,7 +302,7 @@ class SimResourceSourceTest { fun testInfiniteSleep() { assertThrows<IllegalStateException> { runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) @@ -321,7 +322,7 @@ class SimResourceSourceTest { @Test fun testIncorrectDeadline() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val capacity = 4200.0 val provider = SimResourceSource(capacity, scheduler) diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt index 32b6d8ad..ad8d82e3 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusiveTest.kt @@ -33,6 +33,7 @@ import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitchExclusive] class. @@ -44,7 +45,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTrace() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val speed = mutableListOf<Double>() @@ -66,7 +67,7 @@ internal class SimResourceSwitchExclusiveTest { source.startConsumer(adapter) switch.addInput(forwarder) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -86,7 +87,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testRuntimeWorkload() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -97,7 +98,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -113,7 +114,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testTwoWorkloads() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = object : SimResourceConsumer { @@ -141,7 +142,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - val provider = switch.addOutput(3200.0) + val provider = switch.newOutput() try { provider.consume(workload) @@ -158,7 +159,7 @@ internal class SimResourceSwitchExclusiveTest { */ @Test fun testConcurrentWorkloadFails() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L * 1000 val workload = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -169,7 +170,7 @@ internal class SimResourceSwitchExclusiveTest { switch.addInput(source) - switch.addOutput(3200.0) - assertThrows<IllegalStateException> { switch.addOutput(3200.0) } + switch.newOutput() + assertThrows<IllegalStateException> { switch.newOutput() } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt index e7dec172..e4292ec0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMinTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimTraceConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * Test suite for the [SimResourceSwitch] implementations @@ -40,13 +41,13 @@ import org.opendc.simulator.resources.consumer.SimTraceConsumer internal class SimResourceSwitchMaxMinTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val switch = SimResourceSwitchMaxMin(scheduler) val sources = List(2) { SimResourceSource(2000.0, scheduler) } sources.forEach { switch.addInput(it) } - val provider = switch.addOutput(1000.0) + val provider = switch.newOutput() val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) every { consumer.onNext(any()) } returns SimResourceCommand.Consume(1.0, 1.0) andThen SimResourceCommand.Exit @@ -64,27 +65,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L val workload = @@ -97,8 +78,8 @@ internal class SimResourceSwitchMaxMinTest { ), ) - val switch = SimResourceSwitchMaxMin(scheduler, listener) - val provider = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val provider = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -109,9 +90,9 @@ internal class SimResourceSwitchMaxMinTest { } assertAll( - { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -121,27 +102,7 @@ internal class SimResourceSwitchMaxMinTest { */ @Test fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) - - val listener = object : SimResourceSwitchMaxMin.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - - override fun onSliceFinish( - switch: SimResourceSwitchMaxMin, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - totalRequestedWork += requestedWork - totalGrantedWork += grantedWork - totalOvercommittedWork += overcommittedWork - } - } + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val duration = 5 * 60L val workloadA = @@ -163,9 +124,9 @@ internal class SimResourceSwitchMaxMinTest { ) ) - val switch = SimResourceSwitchMaxMin(scheduler, listener) - val providerA = switch.addOutput(3200.0) - val providerB = switch.addOutput(3200.0) + val switch = SimResourceSwitchMaxMin(scheduler) + val providerA = switch.newOutput() + val providerB = switch.newOutput() try { switch.addInput(SimResourceSource(3200.0, scheduler)) @@ -180,9 +141,9 @@ internal class SimResourceSwitchMaxMinTest { switch.close() } assertAll( - { assertEquals(2082000, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1062000, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, { assertEquals(1200000, clock.millis()) } ) } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index 880e1755..810052b8 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimResourceTransformer] class. @@ -41,7 +42,7 @@ internal class SimResourceTransformerTest { @Test fun testExitImmediately() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) launch { @@ -61,7 +62,7 @@ internal class SimResourceTransformerTest { @Test fun testExit() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) launch { @@ -122,7 +123,7 @@ internal class SimResourceTransformerTest { @Test fun testCancelStartedDelegate() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -141,7 +142,7 @@ internal class SimResourceTransformerTest { @Test fun testCancelPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -160,7 +161,7 @@ internal class SimResourceTransformerTest { @Test fun testExitPropagation() = runBlockingSimulation { val forwarder = SimResourceForwarder(isCoupled = true) - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(2000.0, scheduler) val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true) @@ -176,7 +177,7 @@ internal class SimResourceTransformerTest { @Test fun testAdjustCapacity() = runBlockingSimulation { val forwarder = SimResourceForwarder() - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -195,7 +196,7 @@ internal class SimResourceTransformerTest { @Test fun testTransformExit() = runBlockingSimulation { val forwarder = SimResourceTransformer { _, _ -> SimResourceCommand.Exit } - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val source = SimResourceSource(1.0, scheduler) val consumer = spyk(SimWorkConsumer(2.0, 1.0)) @@ -205,4 +206,21 @@ internal class SimResourceTransformerTest { assertEquals(0, clock.millis()) verify(exactly = 1) { consumer.onNext(any()) } } + + @Test + fun testCounters() = runBlockingSimulation { + val forwarder = SimResourceForwarder() + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) + val source = SimResourceSource(1.0, scheduler) + + val consumer = SimWorkConsumer(2.0, 1.0) + source.startConsumer(forwarder) + + forwarder.consume(consumer) + + assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } + assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } + assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(2000, clock.millis()) + } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt index ac8b5814..db4fe856 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimWorkConsumerTest.kt @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.consumer.SimWorkConsumer +import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl /** * A test suite for the [SimWorkConsumer] class. @@ -35,7 +36,7 @@ import org.opendc.simulator.resources.consumer.SimWorkConsumer internal class SimWorkConsumerTest { @Test fun testSmoke() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 1.0) @@ -50,7 +51,7 @@ internal class SimWorkConsumerTest { @Test fun testUtilization() = runBlockingSimulation { - val scheduler = SimResourceSchedulerTrampoline(coroutineContext, clock) + val scheduler = SimResourceInterpreterImpl(coroutineContext, clock) val provider = SimResourceSource(1.0, scheduler) val consumer = SimWorkConsumer(1.0, 0.5) |
