diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 12:13:23 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-22 16:47:56 +0100 |
| commit | f616b720406250b1415593ff04c9d910b1fda54c (patch) | |
| tree | 1e90c30a2173d0b9a84840454d545a64200ac2b8 /simulator/opendc-simulator/opendc-simulator-resources/src/main | |
| parent | f1aa2632804916fb364f4fa207ac8ab97479f711 (diff) | |
simulator: Expose capacity and remaining work outside consumer callback
This change changes the consumer and context interfaces to expose the
provider capacity and remaining work via the context instance as opposed
to only via the callback. This simplifies aggregation of resources.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
8 files changed, 94 insertions, 61 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt index 431ca625..dba334a2 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt @@ -36,6 +36,21 @@ public abstract class SimAbstractResourceContext<R : SimResource>( private val consumer: SimResourceConsumer<R> ) : SimResourceContext<R> { /** + * The capacity of the resource. + */ + override val capacity: Double + get() = resource.capacity + + /** + * The amount of work still remaining at this instant. + */ + override val remainingWork: Double + get() { + val activeCommand = activeCommand ?: return 0.0 + return computeRemainingWork(activeCommand, clock.millis()) + } + + /** * This method is invoked when the resource will idle until the specified [deadline]. */ public abstract fun onIdle(deadline: Long) @@ -64,27 +79,18 @@ public abstract class SimAbstractResourceContext<R : SimResource>( * Compute the speed at which the resource may be consumed. */ protected open fun getSpeed(limit: Double): Double { - return min(limit, getCapacity()) - } - - /** - * Return the capacity available for the resource consumer. - */ - protected open fun getCapacity(): Double { - return resource.capacity + return min(limit, capacity) } /** - * Get the remaining work to process after a resource consumption was flushed. + * Get the remaining work to process after a resource consumption. * * @param work The size of the resource consumption. * @param speed The speed of consumption. * @param duration The duration from the start of the consumption until now. - * @param isInterrupted A flag to indicate that the resource consumption could not be fully processed due to - * it being interrupted before it could finish or reach its deadline. * @return The amount of work remaining. */ - protected open fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + protected open fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { return if (duration > 0L) { val processed = duration / 1000.0 * speed max(0.0, work - processed) @@ -99,13 +105,15 @@ public abstract class SimAbstractResourceContext<R : SimResource>( public fun start() { check(state == SimResourceState.Pending) { "Consumer is already started" } + val now = clock.millis() + state = SimResourceState.Active isProcessing = true - latestFlush = clock.millis() + latestFlush = now try { consumer.onStart(this) - interpret(consumer.onNext(this, getCapacity(), 0.0)) + activeCommand = interpret(consumer.onNext(this), now) } catch (cause: Throwable) { doStop(cause) } finally { @@ -154,36 +162,34 @@ public abstract class SimAbstractResourceContext<R : SimResource>( val activeCommand = activeCommand ?: return val (timestamp, command) = activeCommand + // Note: accessor is reliant on activeCommand being set + val remainingWork = remainingWork + isProcessing = true - this.activeCommand = null val duration = now - timestamp assert(duration >= 0) { "Flush in the past" } - when (command) { + this.activeCommand = when (command) { is SimResourceCommand.Idle -> { // We should only continue processing the next command if: // 1. The resource consumer reached its deadline. // 2. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (command.deadline <= now || !isIntermediate) { - next(remainingWork = 0.0) + next(now) } else { - this.activeCommand = activeCommand + activeCommand } } is SimResourceCommand.Consume -> { - val speed = min(resource.capacity, command.limit) - val isInterrupted = !isIntermediate && duration < getDuration(command.work, speed) - val remainingWork = getRemainingWork(command.work, speed, duration, isInterrupted) - // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource consumer reached its deadline. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) if (remainingWork == 0.0 || command.deadline <= now || !isIntermediate) { - next(remainingWork) + next(now) } else { - interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline)) + interpret(SimResourceCommand.Consume(remainingWork, command.limit, command.deadline), now) } } SimResourceCommand.Exit -> @@ -238,6 +244,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( this.state = SimResourceState.Stopped if (state == SimResourceState.Active) { + activeCommand = null onFinish(cause) } } @@ -245,9 +252,7 @@ public abstract class SimAbstractResourceContext<R : SimResource>( /** * Interpret the specified [SimResourceCommand] that was submitted by the resource consumer. */ - private fun interpret(command: SimResourceCommand) { - val now = clock.millis() - + private fun interpret(command: SimResourceCommand, now: Long): CommandWrapper? { when (command) { is SimResourceCommand.Idle -> { val deadline = command.deadline @@ -265,18 +270,34 @@ public abstract class SimAbstractResourceContext<R : SimResource>( onConsume(work, limit, deadline) } - is SimResourceCommand.Exit -> doStop(null) + is SimResourceCommand.Exit -> { + doStop(null) + // No need to set the next active command + return null + } } - assert(activeCommand == null) { "Concurrent access to current command" } - activeCommand = CommandWrapper(now, command) + return CommandWrapper(now, command) } /** * Request the workload for more work. */ - private fun next(remainingWork: Double) { - interpret(consumer.onNext(this, getCapacity(), remainingWork)) + private fun next(now: Long): CommandWrapper? = interpret(consumer.onNext(this), now) + + /** + * Compute the remaining work based on the specified [wrapper] and [timestamp][now]. + */ + private fun computeRemainingWork(wrapper: CommandWrapper, now: Long): Double { + val (timestamp, command) = wrapper + val duration = now - timestamp + return when (command) { + is SimResourceCommand.Consume -> { + val speed = getSpeed(command.limit) + getRemainingWork(command.work, speed, duration) + } + is SimResourceCommand.Idle, SimResourceCommand.Exit -> 0.0 + } } /** diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 7637ee69..01b56488 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -41,11 +41,9 @@ public interface SimResourceConsumer<in R : SimResource> { * the resource finished processing, reached its deadline or was interrupted. * * @param ctx The execution context in which the consumer runs. - * @param capacity The capacity that is available for the consumer. In case no capacity is available, zero is given. - * @param remainingWork The work of the previous command that was not yet completed due to interruption or deadline. * @return The next command that the resource should execute. */ - public fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand + public fun onNext(ctx: SimResourceContext<R>): SimResourceCommand /** * This method is invoked when the consumer has finished, either because it exited via [SimResourceCommand.Exit], diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index dfb5e9ce..f13764fb 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -40,6 +40,16 @@ public interface SimResourceContext<out R : SimResource> { public val clock: Clock /** + * The resource capacity available at this instant. + */ + public val capacity: Double + + /** + * The amount of work still remaining at this instant. + */ + public val remainingWork: Double + + /** * Ask the resource provider to interrupt its resource. */ public fun interrupt() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt index 23bf8739..732e709a 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceForwarder.kt @@ -43,11 +43,6 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : private var hasDelegateStarted: Boolean = false /** - * The remaining amount of work last cycle. - */ - private var remainingWork: Double = 0.0 - - /** * The state of the forwarder. */ override var state: SimResourceState = SimResourceState.Pending @@ -92,9 +87,8 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : this.ctx = ctx } - override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { val delegate = delegate - this.remainingWork = remainingWork if (!hasDelegateStarted) { start() @@ -103,7 +97,7 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : return if (state == SimResourceState.Stopped) { SimResourceCommand.Exit } else if (delegate != null) { - val command = delegate.onNext(ctx, capacity, remainingWork) + val command = delegate.onNext(ctx) if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards @@ -114,7 +108,7 @@ public class SimResourceForwarder<R : SimResource>(override val resource: R) : if (state == SimResourceState.Stopped) SimResourceCommand.Exit else - onNext(ctx, capacity, 0.0) + onNext(ctx) } else { command } diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index a3b16177..99545c4c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.resources -import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import org.opendc.utils.TimerScheduler diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt index 6b919a77..ee8edfcd 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSwitchMaxMin.kt @@ -57,9 +57,10 @@ public class SimResourceSwitchMaxMin<R : SimResource>( private val outputContexts: MutableList<OutputContext> = mutableListOf() /** - * The total amount of remaining work (of all pCPUs). + * The remaining work of all inputs. */ - private var totalRemainingWork: Double = 0.0 + private val totalRemainingWork: Double + get() = inputConsumers.sumByDouble { it.remainingWork } /** * The total speed requested by the vCPUs. @@ -241,6 +242,8 @@ public class SimResourceSwitchMaxMin<R : SimResource>( * Flush the progress of the vCPUs. */ private fun flushGuests() { + val totalRemainingWork = totalRemainingWork + // Flush all the outputs work for (output in outputContexts) { output.flush(isIntermediate = true) @@ -256,7 +259,6 @@ public class SimResourceSwitchMaxMin<R : SimResource>( totalRequestedSpeed, totalAllocatedSpeed ) - totalRemainingWork = 0.0 totalInterferedWork = 0.0 totalOvercommittedWork = 0.0 @@ -362,29 +364,39 @@ public class SimResourceSwitchMaxMin<R : SimResource>( */ var actualSpeed: Double = 0.0 + private fun reportOvercommit() { + totalOvercommittedWork += remainingWork + } + override fun onIdle(deadline: Long) { + reportOvercommit() + allowedSpeed = 0.0 activeCommand = SimResourceCommand.Idle(deadline) } override fun onConsume(work: Double, limit: Double, deadline: Long) { + reportOvercommit() + allowedSpeed = getSpeed(limit) activeCommand = SimResourceCommand.Consume(work, limit, deadline) } override fun onFinish(cause: Throwable?) { + reportOvercommit() + activeCommand = SimResourceCommand.Exit provider.cancel() super.onFinish(cause) } - override fun getRemainingWork(work: Double, speed: Double, duration: Long, isInterrupted: Boolean): Double { + override fun getRemainingWork(work: Double, speed: Double, duration: Long): Double { // Apply performance interference model val performanceScore = 1.0 // Compute the remaining amount of work - val remainingWork = if (work > 0.0) { + return if (work > 0.0) { // Compute the fraction of compute time allocated to the VM val fraction = actualSpeed / totalAllocatedSpeed @@ -400,12 +412,6 @@ public class SimResourceSwitchMaxMin<R : SimResource>( } else { 0.0 } - - if (!isInterrupted) { - totalOvercommittedWork += remainingWork - } - - return remainingWork } override fun interrupt() { @@ -433,6 +439,12 @@ public class SimResourceSwitchMaxMin<R : SimResource>( */ private lateinit var ctx: SimResourceContext<R> + /** + * The remaining work of this consumer. + */ + val remainingWork: Double + get() = ctx.remainingWork + init { barrier = SimConsumerBarrier(barrier.parties + 1) input.startConsumer(this@InputConsumer) @@ -449,8 +461,7 @@ public class SimResourceSwitchMaxMin<R : SimResource>( this.ctx = ctx } - override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand { - totalRemainingWork += remainingWork + override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { val isLast = barrier.enter() // Flush the progress of the guest after the barrier has been reached. diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index a00ee575..0189fe4c 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -39,7 +39,7 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour iterator = trace.iterator() } - override fun onNext(ctx: SimResourceContext<SimResource>, capacity: Double, remainingWork: Double): SimResourceCommand { + override fun onNext(ctx: SimResourceContext<SimResource>): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { val now = ctx.clock.millis() diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index bad2f403..62425583 100644 --- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -48,8 +48,8 @@ public class SimWorkConsumer<R : SimResource>( remainingWork = work } - override fun onNext(ctx: SimResourceContext<R>, capacity: Double, remainingWork: Double): SimResourceCommand { - val work = this.remainingWork + remainingWork + override fun onNext(ctx: SimResourceContext<R>): SimResourceCommand { + val work = this.remainingWork + ctx.remainingWork this.remainingWork -= work return if (work > 0.0) { SimResourceCommand.Consume(work, limit) |
