summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-29 16:52:30 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:37 +0200
commit657deac134f7b9ee30ed7e2b7667e30f3b17f79f (patch)
treeb778567ac2c07a7bc459a10a50d0036641a1db41 /opendc-simulator/opendc-simulator-resources
parent02fa44c0b116ff51c4cbe2876d8b2a225ed68553 (diff)
perf(simulator): Reduce memory allocations in SimResourceInterpreter
This change removes unnecessary allocations in the SimResourceInterpreter caused by the way timers were allocated for the resource context.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt71
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt199
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt4
3 files changed, 126 insertions, 148 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index d7ea0043..9cbf849d 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -24,6 +24,7 @@ package org.opendc.simulator.resources.impl
import org.opendc.simulator.resources.*
import java.time.Clock
+import java.util.ArrayDeque
import kotlin.math.max
import kotlin.math.min
@@ -96,9 +97,9 @@ internal class SimResourceContextImpl(
private var _flag: Int = 0
/**
- * The current pending update.
+ * The timers at which the context is scheduled to be interrupted.
*/
- private var _pendingUpdate: SimResourceInterpreterImpl.Update? = null
+ private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque()
override fun start() {
check(_state == SimResourceState.Pending) { "Consumer is already started" }
@@ -126,7 +127,7 @@ internal class SimResourceContextImpl(
}
_flag = _flag or FLAG_INTERRUPT
- scheduleUpdate()
+ scheduleUpdate(clock.millis())
}
override fun invalidate() {
@@ -135,7 +136,7 @@ internal class SimResourceContextImpl(
}
_flag = _flag or FLAG_INVALIDATE
- scheduleUpdate()
+ scheduleUpdate(clock.millis())
}
override fun flush() {
@@ -143,7 +144,7 @@ internal class SimResourceContextImpl(
return
}
- interpreter.scheduleSync(this)
+ interpreter.scheduleSync(clock.millis(), this)
}
override fun push(rate: Double) {
@@ -154,9 +155,9 @@ internal class SimResourceContextImpl(
/**
* Determine whether the state of the resource context should be updated.
*/
- fun requiresUpdate(timestamp: Long): Boolean {
+ fun shouldUpdate(timestamp: Long): Boolean {
// Either the resource context is flagged or there is a pending update at this timestamp
- return _flag != 0 || _pendingUpdate?.timestamp == timestamp
+ return _flag != 0 || _deadline == timestamp
}
/**
@@ -204,7 +205,7 @@ internal class SimResourceContextImpl(
_deadline = target
- scheduleUpdate(target)
+ scheduleUpdate(timestamp, target)
}
SimResourceState.Pending ->
if (oldState != SimResourceState.Pending) {
@@ -229,6 +230,30 @@ internal class SimResourceContextImpl(
}
}
+ /**
+ * Prune the elapsed timers from this context.
+ */
+ fun pruneTimers(now: Long) {
+ val timers = _timers
+ while (true) {
+ val head = timers.peek()
+ if (head == null || head.target > now) {
+ break
+ }
+ timers.poll()
+ }
+ }
+
+ /**
+ * Try to re-schedule the resource context in case it was skipped.
+ */
+ fun tryReschedule(now: Long) {
+ val deadline = _deadline
+ if (deadline > now && deadline != Long.MAX_VALUE) {
+ scheduleUpdate(now, deadline)
+ }
+ }
+
override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]"
/**
@@ -279,35 +304,17 @@ internal class SimResourceContextImpl(
/**
* Schedule an update for this resource context.
*/
- private fun scheduleUpdate() {
- // Cancel the pending update
- val pendingUpdate = _pendingUpdate
- if (pendingUpdate != null) {
- _pendingUpdate = null
- pendingUpdate.cancel()
- }
-
- interpreter.scheduleImmediate(this)
+ private fun scheduleUpdate(now: Long) {
+ interpreter.scheduleImmediate(now, this)
}
/**
* Schedule a delayed update for this resource context.
*/
- private fun scheduleUpdate(timestamp: Long) {
- val pendingUpdate = _pendingUpdate
- if (pendingUpdate != null) {
- if (pendingUpdate.timestamp == timestamp) {
- // Fast-path: A pending update for the same timestamp already exists
- return
- } else {
- // Cancel the old pending update
- _pendingUpdate = null
- pendingUpdate.cancel()
- }
- }
-
- if (timestamp != Long.MAX_VALUE) {
- _pendingUpdate = interpreter.scheduleDelayed(this, timestamp)
+ private fun scheduleUpdate(now: Long, target: Long) {
+ val timers = _timers
+ if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) {
+ timers.addFirst(interpreter.scheduleDelayed(now, this, target))
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
index c3dcebd0..2b6ec2ba 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt
@@ -53,7 +53,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* A priority queue containing the resource updates to be scheduled in the future.
*/
- private val futureQueue = PriorityQueue<Update>(compareBy { it.timestamp })
+ private val futureQueue = PriorityQueue<Timer>()
/**
* The stack of interpreter invocations to occur in the future.
@@ -77,13 +77,14 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
get() = batchIndex > 0
/**
- * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle.
- *
- * This method should be used when the state of a resource context is invalidated/interrupted and needs to be
- * re-computed. In case no interpreter is currently active, the interpreter will be started.
+ * Update the specified [ctx] synchronously.
*/
- fun scheduleImmediate(ctx: SimResourceContextImpl) {
- queue.add(ctx)
+ fun scheduleSync(now: Long, ctx: SimResourceContextImpl) {
+ ctx.doUpdate(now)
+
+ if (visited.add(ctx)) {
+ collectAncestors(ctx, visited)
+ }
// In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
// up by the active interpreter.
@@ -93,21 +94,20 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
try {
batchIndex++
- runInterpreter()
+ runInterpreter(now)
} finally {
batchIndex--
}
}
/**
- * Update the specified [ctx] synchronously.
+ * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle.
+ *
+ * This method should be used when the state of a resource context is invalidated/interrupted and needs to be
+ * re-computed. In case no interpreter is currently active, the interpreter will be started.
*/
- fun scheduleSync(ctx: SimResourceContextImpl) {
- ctx.doUpdate(clock.millis())
-
- if (visited.add(ctx)) {
- collectAncestors(ctx, visited)
- }
+ fun scheduleImmediate(now: Long, ctx: SimResourceContextImpl) {
+ queue.add(ctx)
// In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked
// up by the active interpreter.
@@ -117,35 +117,30 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
try {
batchIndex++
- runInterpreter()
+ runInterpreter(now)
} finally {
batchIndex--
}
}
/**
- * Schedule the interpreter to run at [timestamp] to update the resource contexts.
+ * Schedule the interpreter to run at [target] to update the resource contexts.
*
* This method will override earlier calls to this method for the same [ctx].
*
+ * @param now The current virtual timestamp.
* @param ctx The resource context to which the event applies.
- * @param timestamp The timestamp when the interrupt should happen.
+ * @param target The timestamp when the interrupt should happen.
*/
- fun scheduleDelayed(ctx: SimResourceContextImpl, timestamp: Long): Update {
- val now = clock.millis()
+ fun scheduleDelayed(now: Long, ctx: SimResourceContextImpl, target: Long): Timer {
val futureQueue = futureQueue
- require(timestamp >= now) { "Timestamp must be in the future" }
+ require(target >= now) { "Timestamp must be in the future" }
- val update = allocUpdate(ctx, timestamp)
- futureQueue.add(update)
+ val timer = Timer(ctx, target)
+ futureQueue.add(timer)
- // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference.
- if (futureQueue.peek() === update) {
- trySchedule(futureQueue, futureInvocations)
- }
-
- return update
+ return timer
}
override fun newContext(
@@ -162,7 +157,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
try {
// Flush the work if the platform is not already running
if (batchIndex == 1 && queue.isNotEmpty()) {
- runInterpreter()
+ runInterpreter(clock.millis())
}
} finally {
batchIndex--
@@ -172,37 +167,46 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
/**
* Interpret all actions that are scheduled for the current timestamp.
*/
- private fun runInterpreter() {
- val now = clock.millis()
+ private fun runInterpreter(now: Long) {
val queue = queue
val futureQueue = futureQueue
val futureInvocations = futureInvocations
val visited = visited
+ // Remove any entries in the `futureInvocations` queue from the past
+ while (true) {
+ val head = futureInvocations.peek()
+ if (head == null || head.timestamp > now) {
+ break
+ }
+ futureInvocations.poll()
+ }
+
// Execute all scheduled updates at current timestamp
while (true) {
- val update = futureQueue.peek() ?: break
+ val timer = futureQueue.peek() ?: break
+ val ctx = timer.ctx
+ val target = timer.target
- assert(update.timestamp >= now) { "Internal inconsistency: found update of the past" }
+ assert(target >= now) { "Internal inconsistency: found update of the past" }
- if (update.timestamp > now && !update.isCancelled) {
- // Schedule a task for the next event to occur.
- trySchedule(futureQueue, futureInvocations)
+ if (target > now) {
break
}
futureQueue.poll()
- val shouldExecute = !update.isCancelled && update.ctx.requiresUpdate(now)
- if (shouldExecute) {
- update.ctx.doUpdate(now)
+ ctx.pruneTimers(now)
- if (visited.add(update.ctx)) {
- collectAncestors(update.ctx, visited)
+ if (ctx.shouldUpdate(now)) {
+ ctx.doUpdate(now)
+
+ if (visited.add(ctx)) {
+ collectAncestors(ctx, visited)
}
+ } else {
+ ctx.tryReschedule(now)
}
-
- updatePool.add(update)
}
// Repeat execution of all immediate updates until the system has converged to a steady-state
@@ -211,9 +215,8 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
// Execute all immediate updates
while (true) {
val ctx = queue.poll() ?: break
- val shouldExecute = ctx.requiresUpdate(now)
- if (shouldExecute) {
+ if (ctx.shouldUpdate(now)) {
ctx.doUpdate(now)
if (visited.add(ctx)) {
@@ -228,51 +231,59 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
visited.clear()
} while (queue.isNotEmpty())
+
+ // Schedule an interpreter invocation for the next update to occur.
+ val headTimer = futureQueue.peek()
+ if (headTimer != null) {
+ trySchedule(now, futureInvocations, headTimer.target)
+ }
}
/**
- * Try to schedule the next interpreter event.
+ * Collect all the ancestors of the specified [system].
*/
- private fun trySchedule(queue: PriorityQueue<Update>, scheduled: ArrayDeque<Invocation>) {
- val nextTimer = queue.peek()
- val now = clock.millis()
-
- // Check whether we need to update our schedule:
- if (nextTimer == null) {
- // Case 1: all timers are cancelled
- for (invocation in scheduled) {
- invocation.cancel()
- }
- scheduled.clear()
- return
+ private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) {
+ val parent = system.parent
+ if (parent != null) {
+ systems.add(parent)
+ collectAncestors(parent, systems)
}
+ }
+ /**
+ * Try to schedule an interpreter invocation at the specified [target].
+ *
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the interpreter invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
+ */
+ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
while (true) {
val invocation = scheduled.peekFirst()
- if (invocation == null || invocation.timestamp > nextTimer.timestamp) {
+ if (invocation == null || invocation.timestamp > target) {
// Case 2: A new timer was registered ahead of the other timers.
// Solution: Schedule a new scheduler invocation
- val nextTimestamp = nextTimer.timestamp
@OptIn(InternalCoroutinesApi::class)
val handle = delay.invokeOnTimeout(
- nextTimestamp - now,
+ target - now,
{
try {
batchIndex++
- runInterpreter()
+ runInterpreter(target)
} finally {
batchIndex--
}
},
context
)
- scheduled.addFirst(Invocation(nextTimestamp, handle))
+ scheduled.addFirst(Invocation(target, handle))
break
- } else if (invocation.timestamp < nextTimer.timestamp) {
+ } else if (invocation.timestamp < target) {
// Case 2: A timer was cancelled and the head of the timer queue is now later than excepted
// Solution: Cancel the next scheduler invocation
- invocation.cancel()
scheduled.pollFirst()
+
+ invocation.cancel()
} else {
break
}
@@ -280,37 +291,6 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
}
/**
- * Collect all the ancestors of the specified [system].
- */
- private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) {
- val parent = system.parent
- if (parent != null) {
- systems.add(parent)
- collectAncestors(parent, systems)
- }
- }
-
- /**
- * The pool of existing updates.
- */
- private val updatePool = ArrayDeque<Update>()
-
- /**
- * Allocate an [Update] object.
- */
- private fun allocUpdate(ctx: SimResourceContextImpl, timestamp: Long): Update {
- val update = updatePool.poll()
- return if (update != null) {
- update.ctx = ctx
- update.timestamp = timestamp
- update.isCancelled = false
- update
- } else {
- Update(ctx, timestamp)
- }
- }
-
- /**
* A future interpreter invocation.
*
* This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case
@@ -318,34 +298,25 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext,
*/
private data class Invocation(
@JvmField val timestamp: Long,
- private val disposableHandle: DisposableHandle
+ @JvmField val handle: DisposableHandle
) {
/**
* Cancel the interpreter invocation.
*/
- fun cancel() = disposableHandle.dispose()
+ fun cancel() = handle.dispose()
}
/**
- * An update call for [ctx] that is scheduled for [timestamp].
+ * An update call for [ctx] that is scheduled for [target].
*
- * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be
+ * This class represents an update in the future at [target] requested by [ctx]. A deferred update might be
* cancelled if the resource context was invalidated in the meantime.
*/
- class Update(@JvmField var ctx: SimResourceContextImpl, @JvmField var timestamp: Long) {
- /**
- * A flag to indicate that the task has been cancelled.
- */
- @JvmField
- var isCancelled: Boolean = false
-
- /**
- * Cancel the update.
- */
- fun cancel() {
- isCancelled = true
+ class Timer(@JvmField val ctx: SimResourceContextImpl, @JvmField val target: Long) : Comparable<Timer> {
+ override fun compareTo(other: Timer): Int {
+ return target.compareTo(other.target)
}
- override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]"
+ override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]"
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index 4e57f598..e95e9e42 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -51,7 +51,7 @@ class SimResourceContextTest {
val logic = object : SimResourceProviderLogic {}
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
- context.doUpdate(interpreter.clock.millis())
+ interpreter.scheduleSync(interpreter.clock.millis(), context)
}
@Test
@@ -77,7 +77,7 @@ class SimResourceContextTest {
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
- context.doUpdate(interpreter.clock.millis())
+ interpreter.scheduleSync(interpreter.clock.millis(), context)
verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) }
}