summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-27 14:22:02 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:38 +0200
commite07a5357013b92377a840b4d0d394d0ef6605b26 (patch)
tree4569d9b14fa7326df6b7d7803105ea9ed0580506 /opendc-simulator
parent15899c88d29c039149f701e7f0d538a49a436599 (diff)
refactor(simulator): Remove onUpdate callback
This change removes the `onUpdate` callback from the `SimResourceProviderLogic` interface. Instead, users should now update counters using either `onConsume` or `onConverge`.
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt18
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt22
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt148
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt25
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt16
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt79
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt34
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt24
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt3
11 files changed, 197 insertions, 193 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index d064d7fa..621ea6e7 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -90,26 +90,22 @@ public abstract class SimAbstractResourceAggregator(
_output.interrupt()
}
- private val _output = object : SimAbstractResourceProvider(interpreter, parent, initialCapacity = 0.0) {
+ private val _output = object : SimAbstractResourceProvider(interpreter, initialCapacity = 0.0) {
override fun createLogic(): SimResourceProviderLogic {
return object : SimResourceProviderLogic {
- override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long {
+ override fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {
+ updateCounters(ctx, delta)
doConsume(limit)
- return super.onConsume(ctx, now, limit, duration)
}
- override fun onFinish(ctx: SimResourceControllableContext) {
+ override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ updateCounters(ctx, delta)
doFinish()
}
- override fun onUpdate(
- ctx: SimResourceControllableContext,
- delta: Long,
- limit: Double,
- willOvercommit: Boolean
- ) {
- updateCounters(ctx, delta, limit, willOvercommit)
+ override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
index 548bc228..085cba63 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceProvider.kt
@@ -29,7 +29,6 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl
*/
public abstract class SimAbstractResourceProvider(
private val interpreter: SimResourceInterpreter,
- private val parent: SimResourceSystem?,
initialCapacity: Double
) : SimResourceProvider {
/**
@@ -85,25 +84,30 @@ public abstract class SimAbstractResourceProvider(
}
/**
+ * The previous demand for the resource.
+ */
+ private var previousDemand = 0.0
+
+ /**
* Update the counters of the resource provider.
*/
- protected fun updateCounters(ctx: SimResourceContext, delta: Long, limit: Double, willOvercommit: Boolean) {
- if (delta <= 0.0) {
+ protected fun updateCounters(ctx: SimResourceContext, delta: Long) {
+ val demand = previousDemand
+ previousDemand = ctx.demand
+
+ if (delta <= 0) {
return
}
val counters = _counters
val deltaS = delta / 1000.0
- val work = limit * deltaS
+ val work = demand * deltaS
val actualWork = ctx.speed * deltaS
val remainingWork = work - actualWork
counters.demand += work
counters.actual += actualWork
-
- if (willOvercommit && remainingWork > 0.0) {
- counters.overcommit += remainingWork
- }
+ counters.overcommit += remainingWork
}
/**
@@ -118,7 +122,7 @@ public abstract class SimAbstractResourceProvider(
final override fun startConsumer(consumer: SimResourceConsumer) {
check(ctx == null) { "Resource is in invalid state" }
- val ctx = interpreter.newContext(consumer, createLogic(), parent)
+ val ctx = interpreter.newContext(consumer, createLogic())
ctx.capacity = capacity
this.ctx = ctx
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index eac58410..7df940ad 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -93,46 +93,6 @@ public class SimResourceDistributorMaxMin(
/* SimResourceConsumer */
override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return doNext(ctx, now)
- }
-
- override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
- when (event) {
- SimResourceEvent.Start -> {
- this.ctx = ctx
- updateCapacity(ctx)
- }
- SimResourceEvent.Exit -> {
- val iterator = _outputs.iterator()
- while (iterator.hasNext()) {
- val output = iterator.next()
-
- // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call to output.close()
- iterator.remove()
-
- output.close()
- }
- }
- SimResourceEvent.Capacity -> updateCapacity(ctx)
- else -> {}
- }
- }
-
- /**
- * Extended [SimResourceCounters] interface for the distributor.
- */
- public interface Counters : SimResourceCounters {
- /**
- * The amount of work lost due to interference.
- */
- public val interference: Double
- }
-
- /**
- * Schedule the work of the outputs.
- */
- private fun doNext(ctx: SimResourceContext, now: Long): Long {
// If there is no work yet, mark the input as idle.
if (activeOutputs.isEmpty()) {
return Long.MAX_VALUE
@@ -198,6 +158,39 @@ public class SimResourceDistributorMaxMin(
return duration
}
+ override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
+ when (event) {
+ SimResourceEvent.Start -> {
+ this.ctx = ctx
+ updateCapacity(ctx)
+ }
+ SimResourceEvent.Exit -> {
+ val iterator = _outputs.iterator()
+ while (iterator.hasNext()) {
+ val output = iterator.next()
+
+ // Remove the output from the outputs to prevent ConcurrentModificationException when removing it
+ // during the call to output.close()
+ iterator.remove()
+
+ output.close()
+ }
+ }
+ SimResourceEvent.Capacity -> updateCapacity(ctx)
+ else -> {}
+ }
+ }
+
+ /**
+ * Extended [SimResourceCounters] interface for the distributor.
+ */
+ public interface Counters : SimResourceCounters {
+ /**
+ * The amount of work lost due to interference.
+ */
+ public val interference: Double
+ }
+
private fun updateCapacity(ctx: SimResourceContext) {
for (output in _outputs) {
output.capacity = ctx.capacity
@@ -208,7 +201,7 @@ public class SimResourceDistributorMaxMin(
* An internal [SimResourceProvider] implementation for switch outputs.
*/
private inner class Output(capacity: Double, val key: InterferenceKey?) :
- SimAbstractResourceProvider(interpreter, parent, capacity),
+ SimAbstractResourceProvider(interpreter, capacity),
SimResourceCloseableProvider,
SimResourceProviderLogic,
Comparable<Output> {
@@ -263,17 +256,54 @@ public class SimResourceDistributorMaxMin(
}
/* SimResourceProviderLogic */
- override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long {
+ override fun onConsume(
+ ctx: SimResourceControllableContext,
+ now: Long,
+ delta: Long,
+ limit: Double,
+ duration: Long
+ ) {
+ doUpdateCounters(delta)
+
allowedSpeed = min(ctx.capacity, limit)
+ actualSpeed = 0.0
this.limit = limit
this.duration = duration
lastCommandTimestamp = now
+ }
+
+ override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
+ }
+
+ override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ doUpdateCounters(delta)
+
+ limit = 0.0
+ duration = Long.MAX_VALUE
+ actualSpeed = 0.0
+ allowedSpeed = 0.0
+ lastCommandTimestamp = now
+ }
- return super.onConsume(ctx, now, limit, duration)
+ /* Comparable */
+ override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed)
+
+ /**
+ * Pull the next command if necessary.
+ */
+ fun pull(now: Long) {
+ val ctx = ctx
+ if (ctx != null && lastCommandTimestamp < now) {
+ ctx.flush()
+ }
}
- override fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) {
- if (delta <= 0.0) {
+ /**
+ * Helper method to update the resource counters of the distributor.
+ */
+ private fun doUpdateCounters(delta: Long) {
+ if (delta <= 0L) {
return
}
@@ -289,38 +319,14 @@ public class SimResourceDistributorMaxMin(
val work = limit * deltaS
val actualWork = actualSpeed * deltaS
val remainingWork = work - actualWork
- val overcommit = if (willOvercommit && remainingWork > 0.0) {
- remainingWork
- } else {
- 0.0
- }
- updateCounters(work, actualWork, overcommit)
+ updateCounters(work, actualWork, remainingWork)
val distCounters = _counters
distCounters.demand += work
distCounters.actual += actualWork
- distCounters.overcommit += overcommit
+ distCounters.overcommit += remainingWork
distCounters.interference += actualWork * max(0.0, 1 - perfScore)
}
-
- override fun onFinish(ctx: SimResourceControllableContext) {
- limit = 0.0
- duration = Long.MAX_VALUE
- lastCommandTimestamp = ctx.clock.millis()
- }
-
- /* Comparable */
- override fun compareTo(other: Output): Int = allowedSpeed.compareTo(other.allowedSpeed)
-
- /**
- * Pull the next command if necessary.
- */
- fun pull(now: Long) {
- val ctx = ctx
- if (ctx != null && lastCommandTimestamp < now) {
- ctx.flush()
- }
- }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
index 82631377..4bfeaf20 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceInterpreter.kt
@@ -43,13 +43,8 @@ public interface SimResourceInterpreter {
*
* @param consumer The consumer logic.
* @param provider The logic of the resource provider.
- * @param parent The system to which the resource context belongs.
*/
- public fun newContext(
- consumer: SimResourceConsumer,
- provider: SimResourceProviderLogic,
- parent: SimResourceSystem? = null
- ): SimResourceControllableContext
+ public fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext
/**
* Start batching the execution of resource updates until [popBatch] is called.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
index d8ff87f9..cc718165 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceProviderLogic.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.resources
/**
- * The logic of a resource provider.
+ * A collection of callbacks associated with a flow stage.
*/
public interface SimResourceProviderLogic {
/**
@@ -31,29 +31,28 @@ public interface SimResourceProviderLogic {
*
* @param ctx The context in which the provider runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
+ * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds.
* @param limit The limit on the work rate of the resource consumer.
* @param duration The duration of the consumption in milliseconds.
* @return The deadline of the resource consumption.
*/
- public fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long {
- return if (duration == Long.MAX_VALUE) {
- return Long.MAX_VALUE
- } else {
- now + duration
- }
- }
+ public fun onConsume(ctx: SimResourceControllableContext, now: Long, delta: Long, limit: Double, duration: Long) {}
/**
- * This method is invoked when the progress of the resource consumer is materialized.
+ * This method is invoked when the flow graph has converged into a steady-state system.
*
* @param ctx The context in which the provider runs.
- * @param limit The limit on the work rate of the resource consumer.
- * @param willOvercommit A flag to indicate that the remaining work is overcommitted.
+ * @param now The virtual timestamp in milliseconds at which the system converged.
+ * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
*/
- public fun onUpdate(ctx: SimResourceControllableContext, delta: Long, limit: Double, willOvercommit: Boolean) {}
+ public fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {}
/**
* This method is invoked when the resource consumer has finished.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param now The virtual timestamp in milliseconds at which the provider finished.
+ * @param delta The virtual duration between this call and the last call to [onConsume] in milliseconds.
*/
- public fun onFinish(ctx: SimResourceControllableContext) {}
+ public fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 10213f26..c8d4cf0d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -33,21 +33,27 @@ public class SimResourceSource(
initialCapacity: Double,
private val interpreter: SimResourceInterpreter,
private val parent: SimResourceSystem? = null
-) : SimAbstractResourceProvider(interpreter, parent, initialCapacity) {
+) : SimAbstractResourceProvider(interpreter, initialCapacity) {
override fun createLogic(): SimResourceProviderLogic {
return object : SimResourceProviderLogic {
- override fun onUpdate(
+ override fun onConsume(
ctx: SimResourceControllableContext,
+ now: Long,
delta: Long,
limit: Double,
- willOvercommit: Boolean
+ duration: Long
) {
- updateCounters(ctx, delta, limit, willOvercommit)
+ updateCounters(ctx, delta)
}
- override fun onFinish(ctx: SimResourceControllableContext) {
+ override fun onFinish(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ updateCounters(ctx, delta)
cancel()
}
+
+ override fun onConverge(ctx: SimResourceControllableContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index a317f832..397463e0 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -73,16 +73,16 @@ public class SimResourceTransformer(
override fun close() {
val delegate = checkNotNull(delegate) { "Delegate not active" }
+ if (isCoupled)
+ _ctx?.close()
+ else
+ _ctx?.push(0.0)
+
// Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
// reset beforehand the existing state and check whether it has been updated afterwards
reset()
delegate.onEvent(this, SimResourceEvent.Exit)
-
- if (isCoupled)
- _ctx?.close()
- else
- _ctx?.push(0.0)
}
}
@@ -213,6 +213,10 @@ public class SimResourceTransformer(
* Update the resource counters for the transformer.
*/
private fun updateCounters(ctx: SimResourceContext, delta: Long) {
+ if (delta <= 0) {
+ return
+ }
+
val counters = _counters
val deltaS = delta / 1000.0
val work = _limit * deltaS
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 78d79434..5a9ffe2d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -32,11 +32,10 @@ import kotlin.math.min
* Implementation of a [SimResourceContext] managing the communication between resources and resource consumers.
*/
internal class SimResourceContextImpl(
- override val parent: SimResourceSystem?,
private val interpreter: SimResourceInterpreterImpl,
private val consumer: SimResourceConsumer,
private val logic: SimResourceProviderLogic
-) : SimResourceControllableContext, SimResourceSystem {
+) : SimResourceControllableContext {
/**
* The clock of the context.
*/
@@ -84,7 +83,6 @@ internal class SimResourceContextImpl(
private var _limit: Double = 0.0
private var _activeLimit: Double = 0.0
private var _deadline: Long = Long.MIN_VALUE
- private var _lastUpdate: Long = Long.MIN_VALUE
/**
* A flag to indicate that an update is active.
@@ -97,6 +95,12 @@ internal class SimResourceContextImpl(
private var _flag: Int = 0
/**
+ * The timestamp of calls to the callbacks.
+ */
+ private var _lastUpdate: Long = Long.MIN_VALUE
+ private var _lastConvergence: Long = Long.MAX_VALUE
+
+ /**
* The timers at which the context is scheduled to be interrupted.
*/
private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque()
@@ -111,12 +115,20 @@ internal class SimResourceContextImpl(
}
override fun close() {
- if (_state != SimResourceState.Stopped) {
- interpreter.batch {
- _state = SimResourceState.Stopped
- if (!_updateActive) {
- doStop()
- }
+ if (_state == SimResourceState.Stopped) {
+ return
+ }
+
+ interpreter.batch {
+ _state = SimResourceState.Stopped
+ if (!_updateActive) {
+ val now = clock.millis()
+ val delta = max(0, now - _lastUpdate)
+ doStop(now, delta)
+
+ // FIX: Make sure the context converges
+ _flag = _flag or FLAG_INVALIDATE
+ scheduleUpdate(clock.millis())
}
}
}
@@ -166,7 +178,7 @@ internal class SimResourceContextImpl(
*/
fun shouldUpdate(timestamp: Long): Boolean {
// Either the resource context is flagged or there is a pending update at this timestamp
- return _flag != 0 || _deadline == timestamp
+ return _flag != 0 || _limit != _activeLimit || _deadline == timestamp
}
/**
@@ -179,18 +191,13 @@ internal class SimResourceContextImpl(
}
val lastUpdate = _lastUpdate
+
_lastUpdate = now
_updateActive = true
- val reachedDeadline = _deadline <= now
val delta = max(0, now - lastUpdate)
try {
- // Update the resource counters only if there is some progress
- if (now > lastUpdate) {
- logic.onUpdate(this, delta, _activeLimit, reachedDeadline)
- }
-
val duration = consumer.onNext(this, now, delta)
val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration
@@ -200,12 +207,12 @@ internal class SimResourceContextImpl(
// Check whether the state has changed after [consumer.onNext]
when (_state) {
SimResourceState.Active -> {
- logic.onConsume(this, now, _limit, duration)
+ logic.onConsume(this, now, delta, _limit, duration)
// Schedule an update at the new deadline
scheduleUpdate(now, newDeadline)
}
- SimResourceState.Stopped -> doStop()
+ SimResourceState.Stopped -> doStop(now, delta)
SimResourceState.Pending -> throw IllegalStateException("Illegal transition to pending state")
}
@@ -217,18 +224,12 @@ internal class SimResourceContextImpl(
_deadline = newDeadline
_rate = min(capacity, newLimit)
} catch (cause: Throwable) {
- doFail(cause)
+ doFail(now, delta, cause)
} finally {
_updateActive = false
}
}
- override fun onConverge(timestamp: Long) {
- if (_state == SimResourceState.Active) {
- consumer.onEvent(this, SimResourceEvent.Run)
- }
- }
-
/**
* Prune the elapsed timers from this context.
*/
@@ -253,17 +254,35 @@ internal class SimResourceContextImpl(
}
}
+ /**
+ * This method is invoked when the system converges into a steady state.
+ */
+ fun onConverge(timestamp: Long) {
+ val delta = max(0, timestamp - _lastConvergence)
+ _lastConvergence = timestamp
+
+ try {
+ if (_state == SimResourceState.Active) {
+ consumer.onEvent(this, SimResourceEvent.Run)
+ }
+
+ logic.onConverge(this, timestamp, delta)
+ } catch (cause: Throwable) {
+ doFail(timestamp, max(0, timestamp - _lastUpdate), cause)
+ }
+ }
+
override fun toString(): String = "SimResourceContextImpl[capacity=$capacity,rate=$_rate]"
/**
* Stop the resource context.
*/
- private fun doStop() {
+ private fun doStop(now: Long, delta: Long) {
try {
consumer.onEvent(this, SimResourceEvent.Exit)
- logic.onFinish(this)
+ logic.onFinish(this, now, delta)
} catch (cause: Throwable) {
- doFail(cause)
+ doFail(now, delta, cause)
} finally {
_deadline = Long.MAX_VALUE
_limit = 0.0
@@ -273,7 +292,7 @@ internal class SimResourceContextImpl(
/**
* Fail the resource consumer.
*/
- private fun doFail(cause: Throwable) {
+ private fun doFail(now: Long, delta: Long, cause: Throwable) {
try {
consumer.onFailure(this, cause)
} catch (e: Throwable) {
@@ -281,7 +300,7 @@ internal class SimResourceContextImpl(
e.printStackTrace()
}
- logic.onFinish(this)
+ logic.onFinish(this, now, delta)
}
/**
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
index 2b6ec2ba..2abf0749 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
@@ -63,7 +63,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* The systems that have been visited during the interpreter cycle.
*/
- private val visited = linkedSetOf<SimResourceSystem>()
+ private val visited = linkedSetOf<SimResourceContextImpl>()
/**
* The index in the batch stack.
@@ -81,10 +81,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
*/
fun scheduleSync(now: Long, ctx: SimResourceContextImpl) {
ctx.doUpdate(now)
-
- if (visited.add(ctx)) {
- collectAncestors(ctx, visited)
- }
+ visited.add(ctx)
// In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
// up by the active interpreter.
@@ -143,11 +140,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
return timer
}
- override fun newContext(
- consumer: SimResourceConsumer,
- provider: SimResourceProviderLogic,
- parent: SimResourceSystem?
- ): SimResourceControllableContext = SimResourceContextImpl(parent, this, consumer, provider)
+ override fun newContext(consumer: SimResourceConsumer, provider: SimResourceProviderLogic): SimResourceControllableContext = SimResourceContextImpl(this, consumer, provider)
override fun pushBatch() {
batchIndex++
@@ -200,10 +193,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
if (ctx.shouldUpdate(now)) {
ctx.doUpdate(now)
-
- if (visited.add(ctx)) {
- collectAncestors(ctx, visited)
- }
+ visited.add(ctx)
} else {
ctx.tryReschedule(now)
}
@@ -218,10 +208,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
if (ctx.shouldUpdate(now)) {
ctx.doUpdate(now)
-
- if (visited.add(ctx)) {
- collectAncestors(ctx, visited)
- }
+ visited.add(ctx)
}
}
@@ -240,17 +227,6 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * Collect all the ancestors of the specified [system].
- */
- private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) {
- val parent = system.parent
- if (parent != null) {
- systems.add(parent)
- collectAncestors(parent, systems)
- }
- }
-
- /**
* Try to schedule an interpreter invocation at the specified [target].
*
* @param now The current virtual timestamp.
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index c7230a0e..1428ce42 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -50,7 +50,7 @@ class SimResourceContextTest {
}
val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ val context = SimResourceContextImpl(interpreter, consumer, logic)
interpreter.scheduleSync(interpreter.clock.millis(), context)
}
@@ -60,18 +60,15 @@ class SimResourceContextTest {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
val consumer = SimWorkConsumer(1.0, 1.0)
- val logic = spyk(object : SimResourceProviderLogic {
- override fun onFinish(ctx: SimResourceControllableContext) {}
- override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration
- })
- val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ val logic = spyk(object : SimResourceProviderLogic {})
+ val context = SimResourceContextImpl(interpreter, consumer, logic)
context.capacity = 1.0
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
interpreter.scheduleSync(interpreter.clock.millis(), context)
- verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) }
+ verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) }
}
@Test
@@ -80,7 +77,7 @@ class SimResourceContextTest {
val consumer = SimWorkConsumer(1.0, 1.0)
val logic = spyk(object : SimResourceProviderLogic {})
- val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ val context = SimResourceContextImpl(interpreter, consumer, logic)
context.capacity = 1.0
context.start()
@@ -90,8 +87,8 @@ class SimResourceContextTest {
context.invalidate()
assertAll(
- { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } },
- { verify(exactly = 1) { logic.onFinish(any()) } }
+ { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any(), any()) } },
+ { verify(exactly = 1) { logic.onFinish(any(), any(), any()) } }
)
}
@@ -111,7 +108,7 @@ class SimResourceContextTest {
}
val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ val context = SimResourceContextImpl(interpreter, consumer, logic)
context.start()
@@ -136,8 +133,7 @@ class SimResourceContextTest {
})
val logic = object : SimResourceProviderLogic {}
-
- val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ val context = SimResourceContextImpl(interpreter, consumer, logic)
context.capacity = 4200.0
context.start()
context.capacity = 4200.0
@@ -166,7 +162,7 @@ class SimResourceContextTest {
val logic = object : SimResourceProviderLogic {}
- val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ val context = SimResourceContextImpl(interpreter, consumer, logic)
context.start()
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
index fc43c3da..d7d0924f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceTransformerTest.kt
@@ -223,6 +223,9 @@ internal class SimResourceTransformerTest {
forwarder.consume(consumer)
+ yield()
+
+ assertEquals(2.0, source.counters.actual)
assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }