diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-26 13:11:10 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:37 +0200 |
| commit | d575bed5418be222e1d3ad39af862e2390596d61 (patch) | |
| tree | e00656f774a62543a032284f5ef00da479b293d6 /opendc-simulator/opendc-simulator-resources/src/main | |
| parent | a4a611c45dfd5f9e379434f1dc459128cb437338 (diff) | |
refactor(simulator): Combine work and deadline to duration
This change removes the work and deadline properties from the
SimResourceCommand.Consume class and introduces a new property duration.
This property is now used in conjunction with the limit to compute the amount
of work processed by a resource provider.
Previously, we used both work and deadline to compute the duration and
the amount of remaining work at the end of a consumption. However, with
this change, we ensure that a resource consumption always runs at the
same speed once establishing, drastically simplifying the computation
for the amount of work processed during the consumption.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src/main')
14 files changed, 152 insertions, 308 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index 00648876..da5c3257 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -32,12 +32,7 @@ public abstract class SimAbstractResourceAggregator( /** * This method is invoked when the resource consumer consumes resources. */ - protected abstract fun doConsume(work: Double, limit: Double, deadline: Long) - - /** - * This method is invoked when the resource consumer enters an idle state. - */ - protected abstract fun doIdle(deadline: Long) + protected abstract fun doConsume(limit: Double, duration: Long) /** * This method is invoked when the resource consumer finishes processing. @@ -98,26 +93,23 @@ public abstract class SimAbstractResourceAggregator( private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - doIdle(deadline) - return Long.MAX_VALUE - } - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - doConsume(work, limit, deadline) - return Long.MAX_VALUE + override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { + doConsume(limit, duration) + return super.onConsume(ctx, now, limit, duration) } override fun onFinish(ctx: SimResourceControllableContext) { doFinish() } - override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - updateCounters(ctx, work, willOvercommit) - } - - override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return work - _inputConsumers.sumOf { it.remainingWork } + override fun onUpdate( + ctx: SimResourceControllableContext, + delta: Long, + limit: Double, + willOvercommit: Boolean + ) { + updateCounters(ctx, delta, limit, willOvercommit) } } } @@ -157,12 +149,6 @@ public abstract class SimAbstractResourceAggregator( private var _ctx: SimResourceContext? = null /** - * The remaining work of the consumer. - */ - val remainingWork: Double - get() = _ctx?.remainingWork ?: 0.0 - - /** * The resource command to run next. */ private var command: SimResourceCommand? = null @@ -179,7 +165,7 @@ public abstract class SimAbstractResourceAggregator( } /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { var next = command return if (next != null) { @@ -190,7 +176,7 @@ public abstract class SimAbstractResourceAggregator( next = command this.command = null - next ?: SimResourceCommand.Idle() + next ?: SimResourceCommand.Consume(0.0) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index 4e8e803a..548bc228 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -87,21 +87,35 @@ public abstract class SimAbstractResourceProvider( /** * Update the counters of the resource provider. */ - protected fun updateCounters(ctx: SimResourceContext, work: Double, willOvercommit: Boolean) { - if (work <= 0.0) { + protected fun updateCounters(ctx: SimResourceContext, delta: Long, limit: Double, willOvercommit: Boolean) { + if (delta <= 0.0) { return } val counters = _counters - val remainingWork = ctx.remainingWork + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = ctx.speed * deltaS + val remainingWork = work - actualWork + counters.demand += work - counters.actual += work - remainingWork + counters.actual += actualWork if (willOvercommit && remainingWork > 0.0) { counters.overcommit += remainingWork } } + /** + * Update the counters of the resource provider. + */ + protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + val counters = _counters + counters.demand += demand + counters.actual += actual + counters.overcommit += overcommit + } + final override fun startConsumer(consumer: SimResourceConsumer) { check(ctx == null) { "Resource is in invalid state" } val ctx = interpreter.newContext(consumer, createLogic(), parent) diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt index 991cda7a..537be1b5 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt @@ -31,7 +31,7 @@ public class SimResourceAggregatorMaxMin( ) : SimAbstractResourceAggregator(interpreter, parent) { private val consumers = mutableListOf<Input>() - override fun doConsume(work: Double, limit: Double, deadline: Long) { + override fun doConsume(limit: Double, duration: Long) { // Sort all consumers by their capacity consumers.sortWith(compareBy { it.ctx.capacity }) @@ -40,22 +40,15 @@ public class SimResourceAggregatorMaxMin( val inputCapacity = input.ctx.capacity val fraction = inputCapacity / capacity val grantedSpeed = limit * fraction - val grantedWork = fraction * work - val command = if (grantedWork > 0.0 && grantedSpeed > 0.0) - SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline) + val command = if (grantedSpeed > 0.0) + SimResourceCommand.Consume(grantedSpeed, duration) else - SimResourceCommand.Idle() + SimResourceCommand.Consume(0.0, duration) input.push(command) } } - override fun doIdle(deadline: Long) { - for (input in consumers) { - input.push(SimResourceCommand.Idle(deadline)) - } - } - override fun doFinish() { val iterator = consumers.iterator() for (input in iterator) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt index f7f3fa4d..4a980071 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt @@ -27,25 +27,19 @@ package org.opendc.simulator.resources */ public sealed class SimResourceCommand { /** - * A request to the resource to perform the specified amount of work before the given [deadline]. + * A request to the resource to perform work for the specified [duration]. * - * @param work The amount of work to process. * @param limit The maximum amount of work to be processed per second. - * @param deadline The instant at which the work needs to be fulfilled. + * @param duration The duration of the resource consumption in milliseconds. */ - public data class Consume(val work: Double, val limit: Double, val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() { + public data class Consume(val limit: Double, val duration: Long = Long.MAX_VALUE) : SimResourceCommand() { init { - require(work > 0) { "Amount of work must be positive" } - require(limit > 0) { "Limit must be positive" } + require(limit >= 0.0) { "Negative limit is not allowed" } + require(duration >= 0) { "Duration must be positive" } } } /** - * An indication to the resource that the consumer will idle until the specified [deadline] or if it is interrupted. - */ - public data class Idle(val deadline: Long = Long.MAX_VALUE) : SimResourceCommand() - - /** * An indication to the resource that the consumer has finished. */ public object Exit : SimResourceCommand() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt index 4d937514..4d1d2c32 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt @@ -34,9 +34,11 @@ public interface SimResourceConsumer { * the resource finished processing, reached its deadline or was interrupted. * * @param ctx The execution context in which the consumer runs. + * @param now The virtual timestamp in milliseconds at which the update is occurring. + * @param delta The virtual duration between this call and the last call in milliseconds. * @return The next command that the resource should execute. */ - public fun onNext(ctx: SimResourceContext): SimResourceCommand + public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand /** * This method is invoked when an event has occurred. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt index 0d9a6106..f28b43d0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt @@ -50,11 +50,6 @@ public interface SimResourceContext { public val demand: Double /** - * The amount of work still remaining at this instant. - */ - public val remainingWork: Double - - /** * Ask the resource provider to interrupt its resource. */ public fun interrupt() diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index 63cfbdac..d23c7dbb 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -97,8 +97,8 @@ public class SimResourceDistributorMaxMin( } /* SimResourceConsumer */ - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return doNext(ctx) + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + return doNext(ctx, now) } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { @@ -135,38 +135,16 @@ public class SimResourceDistributorMaxMin( } /** - * Update the counters of the distributor. - */ - private fun updateCounters(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - if (work <= 0.0) { - return - } - - val counters = _counters - val remainingWork = ctx.remainingWork - - counters.demand += work - counters.actual += work - remainingWork - - if (willOvercommit && remainingWork > 0.0) { - counters.overcommit += remainingWork - } - } - - /** * Schedule the work of the outputs. */ - private fun doNext(ctx: SimResourceContext): SimResourceCommand { + private fun doNext(ctx: SimResourceContext, now: Long): SimResourceCommand { // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { - return SimResourceCommand.Idle() + return SimResourceCommand.Consume(0.0) } - val now = interpreter.clock.millis() - val capacity = ctx.capacity - var duration: Double = Double.MAX_VALUE - var deadline: Long = Long.MAX_VALUE + var duration: Long = Long.MAX_VALUE var availableSpeed = capacity var totalRequestedSpeed = 0.0 @@ -191,10 +169,10 @@ public class SimResourceDistributorMaxMin( val availableShare = availableSpeed / remaining-- val grantedSpeed = min(output.allowedSpeed, availableShare) - deadline = min(deadline, output.deadline) + duration = min(duration, output.duration) // Ignore idle computation - if (grantedSpeed <= 0.0 || output.work <= 0.0) { + if (grantedSpeed <= 0.0) { output.actualSpeed = 0.0 continue } @@ -203,35 +181,29 @@ public class SimResourceDistributorMaxMin( output.actualSpeed = grantedSpeed availableSpeed -= grantedSpeed - - // The duration that we want to run is that of the shortest request of an output - duration = min(duration, output.work / grantedSpeed) } - val targetDuration = min(duration, (deadline - now) / 1000.0) + val durationS = duration / 1000.0 var totalRequestedWork = 0.0 var totalAllocatedWork = 0.0 for (output in activeOutputs) { - val work = output.work + val limit = output.limit val speed = output.actualSpeed if (speed > 0.0) { - val outputDuration = work / speed - totalRequestedWork += work * (duration / outputDuration) - totalAllocatedWork += work * (targetDuration / outputDuration) + totalRequestedWork += limit * durationS + totalAllocatedWork += speed * durationS } } - assert(deadline >= now) { "Deadline already passed" } - this.totalRequestedSpeed = totalRequestedSpeed this.totalAllocatedWork = totalAllocatedWork val totalAllocatedSpeed = capacity - availableSpeed this.totalAllocatedSpeed = totalAllocatedSpeed return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0) - SimResourceCommand.Consume(totalAllocatedWork, totalAllocatedSpeed, deadline) + SimResourceCommand.Consume(totalAllocatedSpeed, duration) else - SimResourceCommand.Idle(deadline) + SimResourceCommand.Consume(0.0, duration) } private fun updateCapacity(ctx: SimResourceContext) { @@ -243,7 +215,7 @@ public class SimResourceDistributorMaxMin( /** * An internal [SimResourceProvider] implementation for switch outputs. */ - private inner class Output(capacity: Double, private val key: InterferenceKey?) : + private inner class Output(capacity: Double, val key: InterferenceKey?) : SimAbstractResourceProvider(interpreter, parent, capacity), SimResourceCloseableProvider, SimResourceProviderLogic, @@ -254,11 +226,6 @@ public class SimResourceDistributorMaxMin( private var isClosed: Boolean = false /** - * The current requested work. - */ - @JvmField var work: Double = 0.0 - - /** * The requested limit. */ @JvmField var limit: Double = 0.0 @@ -266,7 +233,7 @@ public class SimResourceDistributorMaxMin( /** * The current deadline. */ - @JvmField var deadline: Long = Long.MAX_VALUE + @JvmField var duration: Long = Long.MAX_VALUE /** * The processing speed that is allowed by the model constraints. @@ -304,44 +271,19 @@ public class SimResourceDistributorMaxMin( } /* SimResourceProviderLogic */ - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - allowedSpeed = 0.0 - this.deadline = deadline - work = 0.0 - limit = 0.0 - lastCommandTimestamp = ctx.clock.millis() - - return Long.MAX_VALUE - } - - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { + override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { allowedSpeed = min(ctx.capacity, limit) - this.work = work this.limit = limit - this.deadline = deadline - lastCommandTimestamp = ctx.clock.millis() - - return Long.MAX_VALUE - } - - override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - updateCounters(ctx, work, willOvercommit) + this.duration = duration + lastCommandTimestamp = now - this@SimResourceDistributorMaxMin.updateCounters(ctx, work, willOvercommit) - } - - override fun onFinish(ctx: SimResourceControllableContext) { - work = 0.0 - limit = 0.0 - deadline = Long.MAX_VALUE - lastCommandTimestamp = ctx.clock.millis() + return super.onConsume(ctx, now, limit, duration) } - override fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - val totalRemainingWork = this@SimResourceDistributorMaxMin.ctx?.remainingWork ?: 0.0 - - // Compute the fraction of compute time allocated to the output - val fraction = actualSpeed / totalAllocatedSpeed + override fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) { + if (delta <= 0.0) { + return + } // Compute the performance penalty due to resource interference val perfScore = if (interferenceDomain != null) { @@ -351,12 +293,29 @@ public class SimResourceDistributorMaxMin( 1.0 } - // Compute the work that was actually granted to the output. - val potentialConsumedWork = (totalAllocatedWork - totalRemainingWork) * fraction + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = actualSpeed * deltaS + val remainingWork = work - actualWork + val overcommit = if (willOvercommit && remainingWork > 0.0) { + remainingWork + } else { + 0.0 + } - _counters.interference += potentialConsumedWork * max(0.0, 1 - perfScore) + updateCounters(work, actualWork, overcommit) - return potentialConsumedWork + val distCounters = _counters + distCounters.demand += work + distCounters.actual += actualWork + distCounters.overcommit += overcommit + distCounters.interference += actualWork * max(0.0, 1 - perfScore) + } + + override fun onFinish(ctx: SimResourceControllableContext) { + limit = 0.0 + duration = Long.MAX_VALUE + lastCommandTimestamp = ctx.clock.millis() } /* Comparable */ diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index 2fe1b00f..d8ff87f9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -27,53 +27,33 @@ package org.opendc.simulator.resources */ public interface SimResourceProviderLogic { /** - * This method is invoked when the resource is reported to idle until the specified [deadline]. + * This method is invoked when the consumer ask to consume the resource for the specified [duration]. * * @param ctx The context in which the provider runs. - * @param deadline The deadline that was requested by the resource consumer. - * @return The instant at which to resume the consumer. - */ - public fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long - - /** - * This method is invoked when the resource will be consumed until the specified amount of [work] was processed - * or [deadline] is reached. - * - * @param ctx The context in which the provider runs. - * @param work The amount of work that was requested by the resource consumer. + * @param now The virtual timestamp in milliseconds at which the update is occurring. * @param limit The limit on the work rate of the resource consumer. - * @param deadline The deadline that was requested by the resource consumer. - * @return The instant at which to resume the consumer. + * @param duration The duration of the consumption in milliseconds. + * @return The deadline of the resource consumption. */ - public fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long + public fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { + return if (duration == Long.MAX_VALUE) { + return Long.MAX_VALUE + } else { + now + duration + } + } /** * This method is invoked when the progress of the resource consumer is materialized. * * @param ctx The context in which the provider runs. - * @param work The amount of work that was requested by the resource consumer. + * @param limit The limit on the work rate of the resource consumer. * @param willOvercommit A flag to indicate that the remaining work is overcommitted. */ - public fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) {} + public fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) {} /** * This method is invoked when the resource consumer has finished. */ - public fun onFinish(ctx: SimResourceControllableContext) - - /** - * Compute the amount of work that was consumed over the specified [duration]. - * - * @param work The total size of the resource consumption. - * @param speed The speed of the resource provider. - * @param duration The duration from the start of the consumption until now. - * @return The amount of work that was consumed by the resource provider. - */ - public fun getConsumedWork(ctx: SimResourceControllableContext, work: Double, speed: Double, duration: Long): Double { - return if (duration > 0L) { - return (duration / 1000.0) * speed - } else { - work - } - } + public fun onFinish(ctx: SimResourceControllableContext) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 2d53198a..10213f26 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -22,9 +22,6 @@ package org.opendc.simulator.resources -import kotlin.math.ceil -import kotlin.math.min - /** * A [SimResourceSource] represents a source for some resource that provides bounded processing capacity. * @@ -39,20 +36,13 @@ public class SimResourceSource( ) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { - override fun onIdle(ctx: SimResourceControllableContext, deadline: Long): Long { - return deadline - } - - override fun onConsume(ctx: SimResourceControllableContext, work: Double, limit: Double, deadline: Long): Long { - return if (work.isInfinite()) { - Long.MAX_VALUE - } else { - min(deadline, ctx.clock.millis() + getDuration(work, speed)) - } - } - - override fun onUpdate(ctx: SimResourceControllableContext, work: Double, willOvercommit: Boolean) { - updateCounters(ctx, work, willOvercommit) + override fun onUpdate( + ctx: SimResourceControllableContext, + delta: Long, + limit: Double, + willOvercommit: Boolean + ) { + updateCounters(ctx, delta, limit, willOvercommit) } override fun onFinish(ctx: SimResourceControllableContext) { @@ -62,11 +52,4 @@ public class SimResourceSource( } override fun toString(): String = "SimResourceSource[capacity=$capacity]" - - /** - * Compute the duration that a resource consumption will take with the specified [speed]. - */ - private fun getDuration(work: Double, speed: Double): Long { - return ceil(work / speed * 1000).toLong() - } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index cec27e1c..68bedbd9 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -100,19 +100,19 @@ public class SimResourceTransformer( } } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { val delegate = delegate if (!hasDelegateStarted) { start() } - updateCounters(ctx) + updateCounters(ctx, delta) return if (delegate != null) { - val command = transform(ctx, delegate.onNext(ctx)) + val command = transform(ctx, delegate.onNext(ctx, now, delta)) - _work = if (command is SimResourceCommand.Consume) command.work else 0.0 + _limit = if (command is SimResourceCommand.Consume) command.limit else 0.0 if (command == SimResourceCommand.Exit) { // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we @@ -124,12 +124,12 @@ public class SimResourceTransformer( if (isCoupled) SimResourceCommand.Exit else - onNext(ctx) + onNext(ctx, now, delta) } else { command } } else { - SimResourceCommand.Idle() + SimResourceCommand.Consume(0.0) } } @@ -180,19 +180,21 @@ public class SimResourceTransformer( } /** - * Counter to track the current submitted work. + * The requested speed. */ - private var _work = 0.0 + private var _limit: Double = 0.0 /** * Update the resource counters for the transformer. */ - private fun updateCounters(ctx: SimResourceContext) { + private fun updateCounters(ctx: SimResourceContext, delta: Long) { val counters = _counters - val remainingWork = ctx.remainingWork - counters.demand += _work - counters.actual += _work - remainingWork - counters.overcommit += remainingWork + val deltaS = delta / 1000.0 + val work = _limit * deltaS + val actualWork = ctx.speed * deltaS + counters.demand += work + counters.actual += actualWork + counters.overcommit += (work - actualWork) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt index 4f4ebb14..1f8434b7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt @@ -50,8 +50,8 @@ public class SimSpeedConsumerAdapter( callback(0.0) } - override fun onNext(ctx: SimResourceContext): SimResourceCommand { - return delegate.onNext(ctx) + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + return delegate.onNext(ctx, now, delta) } override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt index 2e94e1c1..e5173e5f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt @@ -34,20 +34,11 @@ import org.opendc.simulator.resources.SimResourceEvent public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer { private var iterator: Iterator<Fragment>? = null - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { val iterator = checkNotNull(iterator) return if (iterator.hasNext()) { - val now = ctx.clock.millis() val fragment = iterator.next() - val work = (fragment.duration / 1000) * fragment.usage - val deadline = now + fragment.duration - - assert(deadline >= now) { "Deadline already passed" } - - if (work > 0.0) - SimResourceCommand.Consume(work, fragment.usage, deadline) - else - SimResourceCommand.Idle(deadline) + SimResourceCommand.Consume(fragment.usage, fragment.duration) } else { SimResourceCommand.Exit } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt index faa693c4..ae837043 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.resources.consumer import org.opendc.simulator.resources.SimResourceCommand import org.opendc.simulator.resources.SimResourceConsumer import org.opendc.simulator.resources.SimResourceContext +import kotlin.math.roundToLong /** * A [SimResourceConsumer] that consumes the specified amount of work at the specified utilization. @@ -39,18 +40,19 @@ public class SimWorkConsumer( require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } - private var isFirst = true + private var remainingWork = work - override fun onNext(ctx: SimResourceContext): SimResourceCommand { + override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand { + val actualWork = ctx.speed * delta / 1000.0 val limit = ctx.capacity * utilization - val work = if (isFirst) { - isFirst = false - work - } else { - ctx.remainingWork - } - return if (work > 0.0) { - SimResourceCommand.Consume(work, limit) + + remainingWork -= actualWork + + val remainingWork = remainingWork + val duration = (remainingWork / limit * 1000).roundToLong() + + return if (duration > 0) { + SimResourceCommand.Consume(limit, duration) } else { SimResourceCommand.Exit } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index b79998a3..a9507e52 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -58,12 +58,6 @@ internal class SimResourceContextImpl( } /** - * The amount of work still remaining at this instant. - */ - override val remainingWork: Double - get() = getRemainingWork(_clock.millis()) - - /** * A flag to indicate the state of the context. */ override val state: SimResourceState @@ -87,8 +81,8 @@ internal class SimResourceContextImpl( * The current state of the resource context. */ private var _timestamp: Long = Long.MIN_VALUE - private var _work: Double = 0.0 private var _limit: Double = 0.0 + private var _duration: Long = Long.MAX_VALUE private var _deadline: Long = Long.MAX_VALUE /** @@ -178,7 +172,6 @@ internal class SimResourceContextImpl( } catch (cause: Throwable) { doFail(cause) } finally { - _remainingWorkFlush = Long.MIN_VALUE _timestamp = timestamp } } @@ -200,29 +193,25 @@ internal class SimResourceContextImpl( SimResourceState.Pending, SimResourceState.Stopped -> state SimResourceState.Active -> { val isInterrupted = _flag and FLAG_INTERRUPT != 0 - val remainingWork = getRemainingWork(timestamp) - val isConsume = _limit > 0.0 val reachedDeadline = _deadline <= timestamp + val delta = max(0, timestamp - _timestamp) // Update the resource counters only if there is some progress if (timestamp > _timestamp) { - logic.onUpdate(this, _work, reachedDeadline) + logic.onUpdate(this, delta, _limit, reachedDeadline) } // We should only continue processing the next command if: // 1. The resource consumption was finished. // 2. The resource capacity cannot satisfy the demand. // 3. The resource consumer should be interrupted (e.g., someone called .interrupt()) - if ((isConsume && remainingWork == 0.0) || reachedDeadline || isInterrupted) { - when (val command = consumer.onNext(this)) { - is SimResourceCommand.Idle -> interpretIdle(timestamp, command.deadline) - is SimResourceCommand.Consume -> interpretConsume(timestamp, command.work, command.limit, command.deadline) + if (reachedDeadline || isInterrupted) { + when (val command = consumer.onNext(this, timestamp, delta)) { + is SimResourceCommand.Consume -> interpretConsume(timestamp, command.limit, command.duration) is SimResourceCommand.Exit -> interpretExit() } - } else if (isConsume) { - interpretConsume(timestamp, remainingWork, _limit, _deadline) } else { - interpretIdle(timestamp, _deadline) + interpretConsume(timestamp, _limit, _duration - delta) } } } @@ -257,32 +246,15 @@ internal class SimResourceContextImpl( /** * Interpret the [SimResourceCommand.Consume] command. */ - private fun interpretConsume(now: Long, work: Double, limit: Double, deadline: Long): SimResourceState { - require(deadline >= now) { "Deadline already passed" } - + private fun interpretConsume(now: Long, limit: Double, duration: Long): SimResourceState { _speed = min(capacity, limit) - _work = work _limit = limit - _deadline = deadline + _duration = duration - val timestamp = logic.onConsume(this, work, limit, deadline) - scheduleUpdate(timestamp) + val timestamp = logic.onConsume(this, now, limit, duration) - return SimResourceState.Active - } - - /** - * Interpret the [SimResourceCommand.Idle] command. - */ - private fun interpretIdle(now: Long, deadline: Long): SimResourceState { - require(deadline >= now) { "Deadline already passed" } - - _speed = 0.0 - _work = 0.0 - _limit = 0.0 - _deadline = deadline + _deadline = timestamp - val timestamp = logic.onIdle(this, deadline) scheduleUpdate(timestamp) return SimResourceState.Active @@ -293,37 +265,13 @@ internal class SimResourceContextImpl( */ private fun interpretExit(): SimResourceState { _speed = 0.0 - _work = 0.0 _limit = 0.0 + _duration = Long.MAX_VALUE _deadline = Long.MAX_VALUE return SimResourceState.Stopped } - private var _remainingWork: Double = 0.0 - private var _remainingWorkFlush: Long = Long.MIN_VALUE - - /** - * Obtain the remaining work at the given timestamp. - */ - private fun getRemainingWork(now: Long): Double { - return if (_remainingWorkFlush < now) { - _remainingWorkFlush = now - computeRemainingWork(now).also { _remainingWork = it } - } else { - _remainingWork - } - } - - /** - * Compute the remaining work based on the current state. - */ - private fun computeRemainingWork(now: Long): Double { - return if (_work > 0.0) - max(0.0, _work - logic.getConsumedWork(this, _work, speed, now - _timestamp)) - else 0.0 - } - /** * Indicate that the capacity of the resource has changed. */ @@ -333,16 +281,11 @@ internal class SimResourceContextImpl( return } - val isThrottled = speed > capacity - interpreter.batch { // Inform the consumer of the capacity change. This might already trigger an interrupt. consumer.onEvent(this, SimResourceEvent.Capacity) - // Optimization: only invalidate context if the new capacity cannot satisfy the active resource command. - if (isThrottled) { - invalidate() - } + interrupt() } } |
