diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 21:00:41 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-23 10:45:30 +0100 |
| commit | b5ac4b4f0c9a9e0c4b2ee744f8184adbe8e8d76a (patch) | |
| tree | 53e7d40138e5e805c88e800183b3200f257a53f2 /simulator/opendc-simulator/opendc-simulator-resources/src/main | |
| parent | 3718c385f84b463ac799080bb5603e0011adcd7d (diff) | |
simulator: Add support for signaling dynamic capacity changes
This change adds support for dynamically changing the capacity of
resources and propagating this change to consumers.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
9 files changed, 112 insertions, 62 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 18ac0cd8..e5991264 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -100,10 +100,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : get() = _inputs private val _inputs = mutableSetOf<SimResourceProvider>() - private val context = object : SimAbstractResourceContext(clock, _output) { - override val capacity: Double - get() = inputContexts.sumByDouble { it.capacity } - + private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double get() = inputContexts.sumByDouble { it.remainingWork } @@ -113,13 +110,9 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : interruptAll() } - override fun onConsume(work: Double, limit: Double, deadline: Long) { - doConsume(work, limit, deadline) - } + override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) - override fun onIdle(deadline: Long) { - doIdle(deadline) - } + override fun onIdle(deadline: Long) = doIdle(deadline) override fun onFinish(cause: Throwable?) { doFinish(cause) @@ -176,6 +169,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private inner class Consumer : SimResourceConsumer { override fun onStart(ctx: SimResourceContext) { onContextStarted(ctx) + onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { @@ -189,6 +183,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : return commands[ctx] ?: SimResourceCommand.Idle() } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + // Adjust capacity of output resource + context.capacity = inputContexts.sumByDouble { it.capacity } + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { onContextFinished(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index f65cbaf4..9705bd17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.resources import java.time.Clock -import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -31,10 +30,25 @@ import kotlin.math.min * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ public abstract class SimAbstractResourceContext( + initialCapacity: Double, override val clock: Clock, private val consumer: SimResourceConsumer ) : SimResourceContext { /** + * The capacity of the resource. + */ + public final override var capacity: Double = initialCapacity + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** * The amount of work still remaining at this instant. */ override val remainingWork: Double @@ -50,6 +64,12 @@ public abstract class SimAbstractResourceContext( private set /** + * The current processing speed of the resource. + */ + public var speed: Double = 0.0 + private set + + /** * This method is invoked when the resource will idle until the specified [deadline]. */ public abstract fun onIdle(deadline: Long) @@ -68,20 +88,6 @@ public abstract class SimAbstractResourceContext( } /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - protected open fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } - - /** - * Compute the speed at which the resource may be consumed. - */ - protected open fun getSpeed(limit: Double): Double { - return min(limit, capacity) - } - - /** * Get the remaining work to process after a resource consumption. * * @param work The size of the resource consumption. @@ -183,8 +189,8 @@ public abstract class SimAbstractResourceContext( is SimResourceCommand.Consume -> { // We should only continue processing the next command if: // 1. The resource consumption was finished. - // 2. The resource consumer reached its deadline. - // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + // 2. The resource capacity cannot satisfy the demand. + // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { next(now) } else { @@ -253,6 +259,8 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = 0.0 + onIdle(deadline) } is SimResourceCommand.Consume -> { @@ -262,10 +270,15 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = min(capacity, limit) + onConsume(work, limit, deadline) } is SimResourceCommand.Exit -> { + speed = 0.0 + doStop(null) + // No need to set the next active command return null } @@ -286,15 +299,31 @@ public abstract class SimAbstractResourceContext( val (timestamp, command) = wrapper val duration = now - timestamp return when (command) { - is SimResourceCommand.Consume -> { - val speed = getSpeed(command.limit) - getRemainingWork(command.work, speed, duration) - } + is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 } } /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + consumer.onCapacityChanged(this, isThrottled) + + // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. + // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). + if (isThrottled) { + flush(isIntermediate = true) + } + } + + /** * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. */ private data class CommandWrapper(val timestamp: Long, val command: SimResourceCommand) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 21f56f9b..f7f3fa4d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. */ public sealed class SimResourceCommand { /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 04c7fcaf..672a3e9d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -46,6 +46,19 @@ public interface SimResourceConsumer { public fun onNext(ctx: SimResourceContext): SimResourceCommand /** + * This is method is invoked when the capacity of the resource changes. + * + * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the + * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly + * causing the active resource command to finish at a later moment than initially planned. + * + * @param ctx The execution context in which the consumer runs. + * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the + * capacity change. + */ + public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} + + /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], * the resource finished itself, or a failure occurred at the resource. * diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index b0f27b9d..9df333e3 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -335,10 +335,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(clock, consumer), Comparable<OutputContext> { - override val capacity: Double - get() = provider.capacity - + ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable<OutputContext> { /** * The current command that is processed by the vCPU. */ @@ -369,7 +366,7 @@ public class SimResourceDistributorMaxMin( override fun onConsume(work: Double, limit: Double, deadline: Long) { reportOvercommit() - allowedSpeed = getSpeed(limit) + allowedSpeed = speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 227f4d62..1a05accd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -118,6 +118,10 @@ public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { } } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + delegate?.onCapacityChanged(ctx, isThrottled) + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3b4e7e7a..9b10edaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock +import kotlin.math.ceil import kotlin.math.min /** @@ -36,7 +37,7 @@ import kotlin.math.min * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( - private val initialCapacity: Double, + initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler<Any> ) : SimResourceProvider { @@ -48,6 +49,15 @@ public class SimResourceSource( private val _speed = MutableStateFlow(0.0) /** + * The capacity of the resource. + */ + public var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** * The [Context] that is currently running. */ private var ctx: Context? = null @@ -89,20 +99,9 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = initialCapacity - - /** - * The processing speed of the resource. - */ - private var speed: Double = 0.0 - set(value) { - field = value - _speed.value = field - } - + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - speed = 0.0 + _speed.value = speed // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { @@ -111,14 +110,15 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - speed = getSpeed(limit) + _speed.value = speed + val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - speed = 0.0 + _speed.value = speed scheduler.cancel(this) cancel() @@ -127,4 +127,11 @@ public class SimResourceSource( override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + private fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 6e431ea1..a10f84b6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -86,6 +86,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private val forwarder: SimResourceForwarder ) : SimResourceProvider by forwarder { override fun close() { + // We explicitly do not close the forwarder here in order to re-use it across output resources. + _outputs -= this availableResources += forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 8f24a020..faa693c4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -39,17 +39,16 @@ public class SimWorkConsumer( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - private var limit = 0.0 - private var remainingWork: Double = 0.0 - - override fun onStart(ctx: SimResourceContext) { - limit = ctx.capacity * utilization - remainingWork = work - } + private var isFirst = true override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val work = this.remainingWork + ctx.remainingWork - this.remainingWork -= work + val limit = ctx.capacity * utilization + val work = if (isFirst) { + isFirst = false + work + } else { + ctx.remainingWork + } return if (work > 0.0) { SimResourceCommand.Consume(work, limit) } else { |
