diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-27 14:22:02 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-03 17:17:38 +0200 |
| commit | e07a5357013b92377a840b4d0d394d0ef6605b26 (patch) | |
| tree | 4569d9b14fa7326df6b7d7803105ea9ed0580506 | |
| parent | 15899c88d29c039149f701e7f0d538a49a436599 (diff) | |
refactor(simulator): Remove onUpdate callback
This change removes the `onUpdate` callback from the
`SimResourceProviderLogic` interface. Instead, users should now update
counters using either `onConsume` or `onConverge`.
12 files changed, 201 insertions, 197 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 12336308..16085d82 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -118,7 +118,7 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3088047, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) @@ -211,7 +211,7 @@ class CapelinIntegrationTest { assertAll( { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12027839, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(476163, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -252,8 +252,8 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(11134319, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9604081, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(11132222, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9606178, exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt index d064d7fa..621ea6e7 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt @@ -90,26 +90,22 @@ public abstract class SimAbstractResourceAggregator( _output.interrupt() } - private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) { + private val _output = object : SimAbstractResourceProvider(interpreter, initialCapacity = 0.0) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { - override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { + override fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) { + updateCounters(ctx, delta) doConsume(limit) - return super.onConsume(ctx, now, limit, duration) } - override fun onFinish(ctx: SimResourceControllableContext) { + override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { + updateCounters(ctx, delta) doFinish() } - override fun onUpdate( - ctx: SimResourceControllableContext, - delta: Long, - limit: Double, - willOvercommit: Boolean - ) { - updateCounters(ctx, delta, limit, willOvercommit) + override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { + parent?.onConverge(now) } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt index 548bc228..085cba63 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt @@ -29,7 +29,6 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl */ public abstract class SimAbstractResourceProvider( private val interpreter: SimResourceInterpreter, - private val parent: SimResourceSystem?, initialCapacity: Double ) : SimResourceProvider { /** @@ -85,25 +84,30 @@ public abstract class SimAbstractResourceProvider( } /** + * The previous demand for the resource. + */ + private var previousDemand = 0.0 + + /** * Update the counters of the resource provider. */ - protected fun updateCounters(ctx: SimResourceContext, delta: Long, limit: Double, willOvercommit: Boolean) { - if (delta <= 0.0) { + protected fun updateCounters(ctx: SimResourceContext, delta: Long) { + val demand = previousDemand + previousDemand = ctx.demand + + if (delta <= 0) { return } val counters = _counters val deltaS = delta / 1000.0 - val work = limit * deltaS + val work = demand * deltaS val actualWork = ctx.speed * deltaS val remainingWork = work - actualWork counters.demand += work counters.actual += actualWork - - if (willOvercommit && remainingWork > 0.0) { - counters.overcommit += remainingWork - } + counters.overcommit += remainingWork } /** @@ -118,7 +122,7 @@ public abstract class SimAbstractResourceProvider( final override fun startConsumer(consumer: SimResourceConsumer) { check(ctx == null) { "Resource is in invalid state" } - val ctx = interpreter.newContext(consumer, createLogic(), parent) + val ctx = interpreter.newContext(consumer, createLogic()) ctx.capacity = capacity this.ctx = ctx diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt index eac58410..7df940ad 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt @@ -93,46 +93,6 @@ public class SimResourceDistributorMaxMin( /* SimResourceConsumer */ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long { - return doNext(ctx, now) - } - - override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { - when (event) { - SimResourceEvent.Start -> { - this.ctx = ctx - updateCapacity(ctx) - } - SimResourceEvent.Exit -> { - val iterator = _outputs.iterator() - while (iterator.hasNext()) { - val output = iterator.next() - - // Remove the output from the outputs to prevent ConcurrentModificationException when removing it - // during the call to output.close() - iterator.remove() - - output.close() - } - } - SimResourceEvent.Capacity -> updateCapacity(ctx) - else -> {} - } - } - - /** - * Extended [SimResourceCounters] interface for the distributor. - */ - public interface Counters : SimResourceCounters { - /** - * The amount of work lost due to interference. - */ - public val interference: Double - } - - /** - * Schedule the work of the outputs. - */ - private fun doNext(ctx: SimResourceContext, now: Long): Long { // If there is no work yet, mark the input as idle. if (activeOutputs.isEmpty()) { return Long.MAX_VALUE @@ -198,6 +158,39 @@ public class SimResourceDistributorMaxMin( return duration } + override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) { + when (event) { + SimResourceEvent.Start -> { + this.ctx = ctx + updateCapacity(ctx) + } + SimResourceEvent.Exit -> { + val iterator = _outputs.iterator() + while (iterator.hasNext()) { + val output = iterator.next() + + // Remove the output from the outputs to prevent ConcurrentModificationException when removing it + // during the call to output.close() + iterator.remove() + + output.close() + } + } + SimResourceEvent.Capacity -> updateCapacity(ctx) + else -> {} + } + } + + /** + * Extended [SimResourceCounters] interface for the distributor. + */ + public interface Counters : SimResourceCounters { + /** + * The amount of work lost due to interference. + */ + public val interference: Double + } + private fun updateCapacity(ctx: SimResourceContext) { for (output in _outputs) { output.capacity = ctx.capacity @@ -208,7 +201,7 @@ public class SimResourceDistributorMaxMin( * An internal [SimResourceProvider] implementation for switch outputs. */ private inner class Output(capacity: Double, val key: InterferenceKey?) : - SimAbstractResourceProvider(interpreter, parent, capacity), + SimAbstractResourceProvider(interpreter, capacity), SimResourceCloseableProvider, SimResourceProviderLogic, Comparable<Output> { @@ -263,17 +256,54 @@ public class SimResourceDistributorMaxMin( } /* SimResourceProviderLogic */ - override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { + override fun onConsume( + ctx: SimResourceControllableContext, + now: Long, + delta: Long, + limit: Double, + duration: Long + ) { + doUpdateCounters(delta) + allowedSpeed = min(ctx.capacity, limit) + actualSpeed = 0.0 this.limit = limit this.duration = duration lastCommandTimestamp = now + } + + override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + + override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { + doUpdateCounters(delta) + + limit = 0.0 + duration = Long.MAX_VALUE + actualSpeed = 0.0 + allowedSpeed = 0.0 + lastCommandTimestamp = now + } - return super.onConsume(ctx, now, limit, duration) + /* Comparable */ + override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed) + + /** + * Pull the next command if necessary. + */ + fun pull(now: Long) { + val ctx = ctx + if (ctx != null && lastCommandTimestamp < now) { + ctx.flush() + } } - override fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) { - if (delta <= 0.0) { + /** + * Helper method to update the resource counters of the distributor. + */ + private fun doUpdateCounters(delta: Long) { + if (delta <= 0L) { return } @@ -289,38 +319,14 @@ public class SimResourceDistributorMaxMin( val work = limit * deltaS val actualWork = actualSpeed * deltaS val remainingWork = work - actualWork - val overcommit = if (willOvercommit && remainingWork > 0.0) { - remainingWork - } else { - 0.0 - } - updateCounters(work, actualWork, overcommit) + updateCounters(work, actualWork, remainingWork) val distCounters = _counters distCounters.demand += work distCounters.actual += actualWork - distCounters.overcommit += overcommit + distCounters.overcommit += remainingWork distCounters.interference += actualWork * max(0.0, 1 - perfScore) } - - override fun onFinish(ctx: SimResourceControllableContext) { - limit = 0.0 - duration = Long.MAX_VALUE - lastCommandTimestamp = ctx.clock.millis() - } - - /* Comparable */ - override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed) - - /** - * Pull the next command if necessary. - */ - fun pull(now: Long) { - val ctx = ctx - if (ctx != null && lastCommandTimestamp < now) { - ctx.flush() - } - } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt index 82631377..4bfeaf20 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt @@ -43,13 +43,8 @@ public interface SimResourceInterpreter { * * @param consumer The consumer logic. * @param provider The logic of the resource provider. - * @param parent The system to which the resource context belongs. */ - public fun newContext( - consumer: SimResourceConsumer, - provider: SimResourceProviderLogic, - parent: SimResourceSystem? = null - ): SimResourceControllableContext + public fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext /** * Start batching the execution of resource updates until [popBatch] is called. diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt index d8ff87f9..cc718165 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.resources /** - * The logic of a resource provider. + * A collection of callbacks associated with a flow stage. */ public interface SimResourceProviderLogic { /** @@ -31,29 +31,28 @@ public interface SimResourceProviderLogic { * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the update is occurring. + * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds. * @param limit The limit on the work rate of the resource consumer. * @param duration The duration of the consumption in milliseconds. * @return The deadline of the resource consumption. */ - public fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long { - return if (duration == Long.MAX_VALUE) { - return Long.MAX_VALUE - } else { - now + duration - } - } + public fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {} /** - * This method is invoked when the progress of the resource consumer is materialized. + * This method is invoked when the flow graph has converged into a steady-state system. * * @param ctx The context in which the provider runs. - * @param limit The limit on the work rate of the resource consumer. - * @param willOvercommit A flag to indicate that the remaining work is overcommitted. + * @param now The virtual timestamp in milliseconds at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) {} + public fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {} /** * This method is invoked when the resource consumer has finished. + * + * @param ctx The context in which the provider runs. + * @param now The virtual timestamp in milliseconds at which the provider finished. + * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds. */ - public fun onFinish(ctx: SimResourceControllableContext) {} + public fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {} } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt index 10213f26..c8d4cf0d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt @@ -33,21 +33,27 @@ public class SimResourceSource( initialCapacity: Double, private val interpreter: SimResourceInterpreter, private val parent: SimResourceSystem? = null -) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) { +) : SimAbstractResourceProvider(interpreter, initialCapacity) { override fun createLogic(): SimResourceProviderLogic { return object : SimResourceProviderLogic { - override fun onUpdate( + override fun onConsume( ctx: SimResourceControllableContext, + now: Long, delta: Long, limit: Double, - willOvercommit: Boolean + duration: Long ) { - updateCounters(ctx, delta, limit, willOvercommit) + updateCounters(ctx, delta) } - override fun onFinish(ctx: SimResourceControllableContext) { + override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) { + updateCounters(ctx, delta) cancel() } + + override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) { + parent?.onConverge(now) + } } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt index a317f832..397463e0 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt @@ -73,16 +73,16 @@ public class SimResourceTransformer( override fun close() { val delegate = checkNotNull(delegate) { "Delegate not active" } + if (isCoupled) + _ctx?.close() + else + _ctx?.push(0.0) + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards reset() delegate.onEvent(this, SimResourceEvent.Exit) - - if (isCoupled) - _ctx?.close() - else - _ctx?.push(0.0) } } @@ -213,6 +213,10 @@ public class SimResourceTransformer( * Update the resource counters for the transformer. */ private fun updateCounters(ctx: SimResourceContext, delta: Long) { + if (delta <= 0) { + return + } + val counters = _counters val deltaS = delta / 1000.0 val work = _limit * deltaS diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index 78d79434..5a9ffe2d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -32,11 +32,10 @@ import kotlin.math.min * Implementation of a [SimResourceContext] managing the communication between resources and resource consumers. */ internal class SimResourceContextImpl( - override val parent: SimResourceSystem?, private val interpreter: SimResourceInterpreterImpl, private val consumer: SimResourceConsumer, private val logic: SimResourceProviderLogic -) : SimResourceControllableContext, SimResourceSystem { +) : SimResourceControllableContext { /** * The clock of the context. */ @@ -84,7 +83,6 @@ internal class SimResourceContextImpl( private var _limit: Double = 0.0 private var _activeLimit: Double = 0.0 private var _deadline: Long = Long.MIN_VALUE - private var _lastUpdate: Long = Long.MIN_VALUE /** * A flag to indicate that an update is active. @@ -97,6 +95,12 @@ internal class SimResourceContextImpl( private var _flag: Int = 0 /** + * The timestamp of calls to the callbacks. + */ + private var _lastUpdate: Long = Long.MIN_VALUE + private var _lastConvergence: Long = Long.MAX_VALUE + + /** * The timers at which the context is scheduled to be interrupted. */ private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque() @@ -111,12 +115,20 @@ internal class SimResourceContextImpl( } override fun close() { - if (_state != SimResourceState.Stopped) { - interpreter.batch { - _state = SimResourceState.Stopped - if (!_updateActive) { - doStop() - } + if (_state == SimResourceState.Stopped) { + return + } + + interpreter.batch { + _state = SimResourceState.Stopped + if (!_updateActive) { + val now = clock.millis() + val delta = max(0, now - _lastUpdate) + doStop(now, delta) + + // FIX: Make sure the context converges + _flag = _flag or FLAG_INVALIDATE + scheduleUpdate(clock.millis()) } } } @@ -166,7 +178,7 @@ internal class SimResourceContextImpl( */ fun shouldUpdate(timestamp: Long): Boolean { // Either the resource context is flagged or there is a pending update at this timestamp - return _flag != 0 || _deadline == timestamp + return _flag != 0 || _limit != _activeLimit || _deadline == timestamp } /** @@ -179,18 +191,13 @@ internal class SimResourceContextImpl( } val lastUpdate = _lastUpdate + _lastUpdate = now _updateActive = true - val reachedDeadline = _deadline <= now val delta = max(0, now - lastUpdate) try { - // Update the resource counters only if there is some progress - if (now > lastUpdate) { - logic.onUpdate(this, delta, _activeLimit, reachedDeadline) - } - val duration = consumer.onNext(this, now, delta) val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration @@ -200,12 +207,12 @@ internal class SimResourceContextImpl( // Check whether the state has changed after [consumer.onNext] when (_state) { SimResourceState.Active -> { - logic.onConsume(this, now, _limit, duration) + logic.onConsume(this, now, delta, _limit, duration) // Schedule an update at the new deadline scheduleUpdate(now, newDeadline) } - SimResourceState.Stopped -> doStop() + SimResourceState.Stopped -> doStop(now, delta) SimResourceState.Pending -> throw IllegalStateException("Illegal transition to pending state") } @@ -217,18 +224,12 @@ internal class SimResourceContextImpl( _deadline = newDeadline _rate = min(capacity, newLimit) } catch (cause: Throwable) { - doFail(cause) + doFail(now, delta, cause) } finally { _updateActive = false } } - override fun onConverge(timestamp: Long) { - if (_state == SimResourceState.Active) { - consumer.onEvent(this, SimResourceEvent.Run) - } - } - /** * Prune the elapsed timers from this context. */ @@ -253,17 +254,35 @@ internal class SimResourceContextImpl( } } + /** + * This method is invoked when the system converges into a steady state. + */ + fun onConverge(timestamp: Long) { + val delta = max(0, timestamp - _lastConvergence) + _lastConvergence = timestamp + + try { + if (_state == SimResourceState.Active) { + consumer.onEvent(this, SimResourceEvent.Run) + } + + logic.onConverge(this, timestamp, delta) + } catch (cause: Throwable) { + doFail(timestamp, max(0, timestamp - _lastUpdate), cause) + } + } + override fun toString(): String = "SimResourceContextImpl[capacity=$capacity,rate=$_rate]" /** * Stop the resource context. */ - private fun doStop() { + private fun doStop(now: Long, delta: Long) { try { consumer.onEvent(this, SimResourceEvent.Exit) - logic.onFinish(this) + logic.onFinish(this, now, delta) } catch (cause: Throwable) { - doFail(cause) + doFail(now, delta, cause) } finally { _deadline = Long.MAX_VALUE _limit = 0.0 @@ -273,7 +292,7 @@ internal class SimResourceContextImpl( /** * Fail the resource consumer. */ - private fun doFail(cause: Throwable) { + private fun doFail(now: Long, delta: Long, cause: Throwable) { try { consumer.onFailure(this, cause) } catch (e: Throwable) { @@ -281,7 +300,7 @@ internal class SimResourceContextImpl( e.printStackTrace() } - logic.onFinish(this) + logic.onFinish(this, now, delta) } /** diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt index 2b6ec2ba..2abf0749 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -63,7 +63,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * The systems that have been visited during the interpreter cycle. */ - private val visited = linkedSetOf<SimResourceSystem>() + private val visited = linkedSetOf<SimResourceContextImpl>() /** * The index in the batch stack. @@ -81,10 +81,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, */ fun scheduleSync(now: Long, ctx: SimResourceContextImpl) { ctx.doUpdate(now) - - if (visited.add(ctx)) { - collectAncestors(ctx, visited) - } + visited.add(ctx) // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked // up by the active interpreter. @@ -143,11 +140,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, return timer } - override fun newContext( - consumer: SimResourceConsumer, - provider: SimResourceProviderLogic, - parent: SimResourceSystem? - ): SimResourceControllableContext = SimResourceContextImpl(parent, this, consumer, provider) + override fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext = SimResourceContextImpl(this, consumer, provider) override fun pushBatch() { batchIndex++ @@ -200,10 +193,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, if (ctx.shouldUpdate(now)) { ctx.doUpdate(now) - - if (visited.add(ctx)) { - collectAncestors(ctx, visited) - } + visited.add(ctx) } else { ctx.tryReschedule(now) } @@ -218,10 +208,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, if (ctx.shouldUpdate(now)) { ctx.doUpdate(now) - - if (visited.add(ctx)) { - collectAncestors(ctx, visited) - } + visited.add(ctx) } } @@ -240,17 +227,6 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * Collect all the ancestors of the specified [system]. - */ - private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) { - val parent = system.parent - if (parent != null) { - systems.add(parent) - collectAncestors(parent, systems) - } - } - - /** * Try to schedule an interpreter invocation at the specified [target]. * * @param now The current virtual timestamp. diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index c7230a0e..1428ce42 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -50,7 +50,7 @@ class SimResourceContextTest { } val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(null, interpreter, consumer, logic) + val context = SimResourceContextImpl(interpreter, consumer, logic) interpreter.scheduleSync(interpreter.clock.millis(), context) } @@ -60,18 +60,15 @@ class SimResourceContextTest { val interpreter = SimResourceInterpreterImpl(coroutineContext, clock) val consumer = SimWorkConsumer(1.0, 1.0) - val logic = spyk(object : SimResourceProviderLogic { - override fun onFinish(ctx: SimResourceControllableContext) {} - override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration - }) - val context = SimResourceContextImpl(null, interpreter, consumer, logic) + val logic = spyk(object : SimResourceProviderLogic {}) + val context = SimResourceContextImpl(interpreter, consumer, logic) context.capacity = 1.0 context.start() delay(1) // Delay 1 ms to prevent hitting the fast path interpreter.scheduleSync(interpreter.clock.millis(), context) - verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } + verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } } @Test @@ -80,7 +77,7 @@ class SimResourceContextTest { val consumer = SimWorkConsumer(1.0, 1.0) val logic = spyk(object : SimResourceProviderLogic {}) - val context = SimResourceContextImpl(null, interpreter, consumer, logic) + val context = SimResourceContextImpl(interpreter, consumer, logic) context.capacity = 1.0 context.start() @@ -90,8 +87,8 @@ class SimResourceContextTest { context.invalidate() assertAll( - { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } }, - { verify(exactly = 1) { logic.onFinish(any()) } } + { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } }, + { verify(exactly = 1) { logic.onFinish(any(), any(), any()) } } ) } @@ -111,7 +108,7 @@ class SimResourceContextTest { } val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(null, interpreter, consumer, logic) + val context = SimResourceContextImpl(interpreter, consumer, logic) context.start() @@ -136,8 +133,7 @@ class SimResourceContextTest { }) val logic = object : SimResourceProviderLogic {} - - val context = SimResourceContextImpl(null, interpreter, consumer, logic) + val context = SimResourceContextImpl(interpreter, consumer, logic) context.capacity = 4200.0 context.start() context.capacity = 4200.0 @@ -166,7 +162,7 @@ class SimResourceContextTest { val logic = object : SimResourceProviderLogic {} - val context = SimResourceContextImpl(null, interpreter, consumer, logic) + val context = SimResourceContextImpl(interpreter, consumer, logic) context.start() diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt index fc43c3da..d7d0924f 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt @@ -223,6 +223,9 @@ internal class SimResourceTransformerTest { forwarder.consume(consumer) + yield() + + assertEquals(2.0, source.counters.actual) assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } |
