summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-28 22:10:12 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:37 +0200
commitd031a70f8bea02a86df7840c5ce731185df86883 (patch)
tree0eafce9c17be7d0818702c5b66cc9e568ab08981 /opendc-simulator/opendc-simulator-resources
parentd3b0b551362eb677c12047cba82a6279ea4608b4 (diff)
refactor(simulator): Invoke consumer callback on every invalidation
This change updates the simulator implementation to always invoke the `SimResourceConsumer.onNext` callback when the resource context is invalidated. This allows users to update the resource counter or do some other work if the context has changed.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt18
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt36
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt33
3 files changed, 35 insertions, 52 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index ad6b0108..4c0e075c 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -31,12 +31,20 @@ import org.opendc.simulator.resources.SimResourceEvent
* consumption for some period of time.
*/
public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
- private var iterator: Iterator<Fragment>? = null
+ private var _iterator: Iterator<Fragment>? = null
+ private var _nextTarget = Long.MIN_VALUE
override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- val iterator = checkNotNull(iterator)
+ // Check whether the trace fragment was fully consumed, otherwise wait until we have done so
+ val nextTarget = _nextTarget
+ if (nextTarget > now) {
+ return now - nextTarget
+ }
+
+ val iterator = checkNotNull(_iterator)
return if (iterator.hasNext()) {
val fragment = iterator.next()
+ _nextTarget = now + fragment.duration
ctx.push(fragment.usage)
fragment.duration
} else {
@@ -48,11 +56,11 @@ public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResour
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
when (event) {
SimResourceEvent.Start -> {
- check(iterator == null) { "Consumer already running" }
- iterator = trace.iterator()
+ check(_iterator == null) { "Consumer already running" }
+ _iterator = trace.iterator()
}
SimResourceEvent.Exit -> {
- iterator = null
+ _iterator = null
}
else -> {}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 9cbf849d..1ac38946 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -148,6 +148,10 @@ internal class SimResourceContextImpl(
}
override fun push(rate: Double) {
+ if (_limit == rate) {
+ return
+ }
+
_speed = min(capacity, rate)
_limit = rate
}
@@ -163,7 +167,7 @@ internal class SimResourceContextImpl(
/**
* Update the state of the resource context.
*/
- fun doUpdate(timestamp: Long) {
+ fun doUpdate(now: Long) {
val oldState = _state
if (oldState != SimResourceState.Active) {
return
@@ -171,41 +175,29 @@ internal class SimResourceContextImpl(
_updateActive = true
- val flag = _flag
- val isInterrupted = flag and FLAG_INTERRUPT != 0
- val reachedDeadline = _deadline <= timestamp
- val delta = max(0, timestamp - _timestamp)
+ val reachedDeadline = _deadline <= now
+ val delta = max(0, now - _timestamp)
try {
-
// Update the resource counters only if there is some progress
- if (timestamp > _timestamp) {
+ if (now > _timestamp) {
logic.onUpdate(this, delta, _limit, reachedDeadline)
}
- // We should only continue processing the next command if:
- // 1. The resource consumption was finished.
- // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
- val duration = if (reachedDeadline || isInterrupted) {
- consumer.onNext(this, timestamp, delta)
- } else {
- _deadline - timestamp
- }
+ val duration = consumer.onNext(this, now, delta)
// Reset update flags
_flag = 0
when (_state) {
SimResourceState.Active -> {
- val limit = _limit
- push(limit)
- _duration = duration
-
- val target = logic.onConsume(this, timestamp, limit, duration)
+ val target = logic.onConsume(this, now, _limit, duration)
+ _speed = min(capacity, _limit)
+ _duration = duration
_deadline = target
- scheduleUpdate(timestamp, target)
+ scheduleUpdate(now, target)
}
SimResourceState.Pending ->
if (oldState != SimResourceState.Pending) {
@@ -219,7 +211,7 @@ internal class SimResourceContextImpl(
} catch (cause: Throwable) {
doFail(cause)
} finally {
- _timestamp = timestamp
+ _timestamp = now
_updateActive = false
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
index e95e9e42..c7230a0e 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt
@@ -26,6 +26,7 @@ import io.mockk.*
import kotlinx.coroutines.*
import org.junit.jupiter.api.*
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.resources.consumer.SimWorkConsumer
import org.opendc.simulator.resources.impl.SimResourceContextImpl
import org.opendc.simulator.resources.impl.SimResourceInterpreterImpl
@@ -57,23 +58,14 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlush() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(4.0)
- 1000
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- }
+ val consumer = SimWorkConsumer(1.0, 1.0)
val logic = spyk(object : SimResourceProviderLogic {
override fun onFinish(ctx: SimResourceControllableContext) {}
override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long = duration
})
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ context.capacity = 1.0
context.start()
delay(1) // Delay 1 ms to prevent hitting the fast path
@@ -85,29 +77,20 @@ class SimResourceContextTest {
@Test
fun testIntermediateFlushIdle() = runBlockingSimulation {
val interpreter = SimResourceInterpreterImpl(coroutineContext, clock)
- val consumer = object : SimResourceConsumer {
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- return if (now == 0L) {
- ctx.push(0.0)
- 10
- } else {
- ctx.close()
- Long.MAX_VALUE
- }
- }
- }
+ val consumer = SimWorkConsumer(1.0, 1.0)
val logic = spyk(object : SimResourceProviderLogic {})
val context = SimResourceContextImpl(null, interpreter, consumer, logic)
+ context.capacity = 1.0
context.start()
- delay(5)
+ delay(500)
context.invalidate()
- delay(5)
+ delay(500)
context.invalidate()
assertAll(
- { verify(exactly = 2) { logic.onConsume(any(), any(), 0.0, any()) } },
+ { verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } },
{ verify(exactly = 1) { logic.onFinish(any()) } }
)
}