From b5ac4b4f0c9a9e0c4b2ee744f8184adbe8e8d76a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 22 Mar 2021 21:00:41 +0100 Subject: simulator: Add support for signaling dynamic capacity changes This change adds support for dynamically changing the capacity of resources and propagating this change to consumers. --- .../resources/SimAbstractResourceAggregator.kt | 19 +++--- .../resources/SimAbstractResourceContext.kt | 71 +++++++++++++++------- .../simulator/resources/SimResourceCommand.kt | 2 +- .../simulator/resources/SimResourceConsumer.kt | 13 ++++ .../resources/SimResourceDistributorMaxMin.kt | 7 +-- .../simulator/resources/SimResourceForwarder.kt | 4 ++ .../simulator/resources/SimResourceSource.kt | 39 +++++++----- .../resources/SimResourceSwitchExclusive.kt | 2 + .../resources/consumer/SimWorkConsumer.kt | 17 +++--- 9 files changed, 112 insertions(+), 62 deletions(-) (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main') diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 18ac0cd8..e5991264 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -100,10 +100,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : get() = _inputs private val _inputs = mutableSetOf() - private val context = object : SimAbstractResourceContext(clock, _output) { - override val capacity: Double - get() = inputContexts.sumByDouble { it.capacity } - + private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) { override val remainingWork: Double get() = inputContexts.sumByDouble { it.remainingWork } @@ -113,13 +110,9 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : interruptAll() } - override fun onConsume(work: Double, limit: Double, deadline: Long) { - doConsume(work, limit, deadline) - } + override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline) - override fun onIdle(deadline: Long) { - doIdle(deadline) - } + override fun onIdle(deadline: Long) = doIdle(deadline) override fun onFinish(cause: Throwable?) { doFinish(cause) @@ -176,6 +169,7 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : private inner class Consumer : SimResourceConsumer { override fun onStart(ctx: SimResourceContext) { onContextStarted(ctx) + onCapacityChanged(ctx, false) // Make sure we initialize the output if we have not done so yet if (context.state == SimResourceState.Pending) { @@ -189,6 +183,11 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) : return commands[ctx] ?: SimResourceCommand.Idle() } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + // Adjust capacity of output resource + context.capacity = inputContexts.sumByDouble { it.capacity } + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { onContextFinished(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index f65cbaf4..9705bd17 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.resources import java.time.Clock -import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -31,9 +30,24 @@ import kotlin.math.min * Partial implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ public abstract class SimAbstractResourceContext( + initialCapacity: Double, override val clock: Clock, private val consumer: SimResourceConsumer ) : SimResourceContext { + /** + * The capacity of the resource. + */ + public final override var capacity: Double = initialCapacity + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + /** * The amount of work still remaining at this instant. */ @@ -49,6 +63,12 @@ public abstract class SimAbstractResourceContext( public var state: SimResourceState = SimResourceState.Pending private set + /** + * The current processing speed of the resource. + */ + public var speed: Double = 0.0 + private set + /** * This method is invoked when the resource will idle until the specified [deadline]. */ @@ -67,20 +87,6 @@ public abstract class SimAbstractResourceContext( consumer.onFinish(this, cause) } - /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - protected open fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } - - /** - * Compute the speed at which the resource may be consumed. - */ - protected open fun getSpeed(limit: Double): Double { - return min(limit, capacity) - } - /** * Get the remaining work to process after a resource consumption. * @@ -183,8 +189,8 @@ public abstract class SimAbstractResourceContext( is SimResourceCommand.Consume -> { // We should only continue processing the next command if: // 1. The resource consumption was finished. - // 2. The resource consumer reached its deadline. - // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) + // 2. The resource capacity cannot satisfy the demand. + // 4. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { next(now) } else { @@ -253,6 +259,8 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = 0.0 + onIdle(deadline) } is SimResourceCommand.Consume -> { @@ -262,10 +270,15 @@ public abstract class SimAbstractResourceContext( require(deadline >= now) { "Deadline already passed" } + speed = min(capacity, limit) + onConsume(work, limit, deadline) } is SimResourceCommand.Exit -> { + speed = 0.0 + doStop(null) + // No need to set the next active command return null } @@ -286,14 +299,30 @@ public abstract class SimAbstractResourceContext( val (timestamp, command) = wrapper val duration = now - timestamp return when (command) { - is SimResourceCommand.Consume -> { - val speed = getSpeed(command.limit) - getRemainingWork(command.work, speed, duration) - } + is SimResourceCommand.Consume -> getRemainingWork(command.work, speed, duration) is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 } } + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (state != SimResourceState.Active) { + return + } + + val isThrottled = speed > capacity + consumer.onCapacityChanged(this, isThrottled) + + // Optimization: only flush changes if the new capacity cannot satisfy the active resource command. + // Alternatively, if the consumer already interrupts the resource, the fast-path will be taken in flush(). + if (isThrottled) { + flush(isIntermediate = true) + } + } + /** * This class wraps a [command] with the timestamp it was started and possibly the task associated with it. */ diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index 21f56f9b..f7f3fa4d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * A SimResourceCommand communicates to a [SimResource] how it is consumed by a [SimResourceConsumer]. + * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer]. */ public sealed class SimResourceCommand { /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 04c7fcaf..672a3e9d 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -45,6 +45,19 @@ public interface SimResourceConsumer { */ public fun onNext(ctx: SimResourceContext): SimResourceCommand + /** + * This is method is invoked when the capacity of the resource changes. + * + * After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the + * resource via [SimResourceContext.interrupt]. Alternatively, the consumer may decide to ignore the event, possibly + * causing the active resource command to finish at a later moment than initially planned. + * + * @param ctx The execution context in which the consumer runs. + * @param isThrottled A flag to indicate that the active resource command will be throttled as a result of the + * capacity change. + */ + public fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {} + /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], * the resource finished itself, or a failure occurred at the resource. diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index b0f27b9d..9df333e3 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -335,10 +335,7 @@ public class SimResourceDistributorMaxMin( private inner class OutputContext( private val provider: OutputProvider, consumer: SimResourceConsumer - ) : SimAbstractResourceContext(clock, consumer), Comparable { - override val capacity: Double - get() = provider.capacity - + ) : SimAbstractResourceContext(provider.capacity, clock, consumer), Comparable { /** * The current command that is processed by the vCPU. */ @@ -369,7 +366,7 @@ public class SimResourceDistributorMaxMin( override fun onConsume(work: Double, limit: Double, deadline: Long) { reportOvercommit() - allowedSpeed = getSpeed(limit) + allowedSpeed = speed activeCommand = SimResourceCommand.Consume(work, limit, deadline) } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 227f4d62..1a05accd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -118,6 +118,10 @@ public class SimResourceForwarder : SimResourceProvider, SimResourceConsumer { } } + override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) { + delegate?.onCapacityChanged(ctx, isThrottled) + } + override fun onFinish(ctx: SimResourceContext, cause: Throwable?) { this.ctx = null diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 3b4e7e7a..9b10edaf 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler import java.time.Clock +import kotlin.math.ceil import kotlin.math.min /** @@ -36,7 +37,7 @@ import kotlin.math.min * @param scheduler The scheduler to schedule the interrupts. */ public class SimResourceSource( - private val initialCapacity: Double, + initialCapacity: Double, private val clock: Clock, private val scheduler: TimerScheduler ) : SimResourceProvider { @@ -47,6 +48,15 @@ public class SimResourceSource( get() = _speed private val _speed = MutableStateFlow(0.0) + /** + * The capacity of the resource. + */ + public var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + /** * The [Context] that is currently running. */ @@ -89,20 +99,9 @@ public class SimResourceSource( /** * Internal implementation of [SimResourceContext] for this class. */ - private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(clock, consumer) { - override val capacity: Double = initialCapacity - - /** - * The processing speed of the resource. - */ - private var speed: Double = 0.0 - set(value) { - field = value - _speed.value = field - } - + private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) { override fun onIdle(deadline: Long) { - speed = 0.0 + _speed.value = speed // Do not resume if deadline is "infinite" if (deadline != Long.MAX_VALUE) { @@ -111,14 +110,15 @@ public class SimResourceSource( } override fun onConsume(work: Double, limit: Double, deadline: Long) { - speed = getSpeed(limit) + _speed.value = speed + val until = min(deadline, clock.millis() + getDuration(work, speed)) scheduler.startSingleTimerTo(this, until, ::flush) } override fun onFinish(cause: Throwable?) { - speed = 0.0 + _speed.value = speed scheduler.cancel(this) cancel() @@ -127,4 +127,11 @@ public class SimResourceSource( override fun toString(): String = "SimResourceSource.Context[capacity=$capacity]" } + + /** + * Compute the duration that a resource consumption will take with the specified [speed]. + */ + private fun getDuration(work: Double, speed: Double): Long { + return ceil(work / speed * 1000).toLong() + } } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt index 6e431ea1..a10f84b6 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchExclusive.kt @@ -86,6 +86,8 @@ public class SimResourceSwitchExclusive : SimResourceSwitch { private val forwarder: SimResourceForwarder ) : SimResourceProvider by forwarder { override fun close() { + // We explicitly do not close the forwarder here in order to re-use it across output resources. + _outputs -= this availableResources += forwarder } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index 8f24a020..faa693c4 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -39,17 +39,16 @@ public class SimWorkConsumer( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - private var limit = 0.0 - private var remainingWork: Double = 0.0 - - override fun onStart(ctx: SimResourceContext) { - limit = ctx.capacity * utilization - remainingWork = work - } + private var isFirst = true override fun onNext(ctx: SimResourceContext): SimResourceCommand { - val work = this.remainingWork + ctx.remainingWork - this.remainingWork -= work + val limit = ctx.capacity * utilization + val work = if (isFirst) { + isFirst = false + work + } else { + ctx.remainingWork + } return if (work > 0.0) { SimResourceCommand.Consume(work, limit) } else { -- cgit v1.2.3