summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt4
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt58
2 files changed, 34 insertions, 28 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index f12ef9f1..a317f832 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -152,9 +152,7 @@ public class SimResourceTransformer(
updateCounters(ctx, delta)
return if (delegate != null) {
- val duration = transform(ctx, delegate.onNext(this.ctx, now, delta))
- _limit = ctx.demand
- duration
+ transform(ctx, delegate.onNext(this.ctx, now, delta))
} else {
Long.MAX_VALUE
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index 1ac38946..78d79434 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -69,8 +69,8 @@ internal class SimResourceContextImpl(
* The current processing speed of the resource.
*/
override val speed: Double
- get() = _speed
- private var _speed = 0.0
+ get() = _rate
+ private var _rate = 0.0
/**
* The current resource processing demand.
@@ -81,10 +81,10 @@ internal class SimResourceContextImpl(
/**
* The current state of the resource context.
*/
- private var _timestamp: Long = Long.MIN_VALUE
private var _limit: Double = 0.0
- private var _duration: Long = Long.MAX_VALUE
- private var _deadline: Long = Long.MAX_VALUE
+ private var _activeLimit: Double = 0.0
+ private var _deadline: Long = Long.MIN_VALUE
+ private var _lastUpdate: Long = Long.MIN_VALUE
/**
* A flag to indicate that an update is active.
@@ -152,8 +152,13 @@ internal class SimResourceContextImpl(
return
}
- _speed = min(capacity, rate)
_limit = rate
+
+ // Invalidate only if the active limit is change and no update is active
+ // If an update is active, it will already get picked up at the end of the update
+ if (_activeLimit != rate && !_updateActive) {
+ invalidate()
+ }
}
/**
@@ -173,45 +178,47 @@ internal class SimResourceContextImpl(
return
}
+ val lastUpdate = _lastUpdate
+ _lastUpdate = now
_updateActive = true
val reachedDeadline = _deadline <= now
- val delta = max(0, now - _timestamp)
+ val delta = max(0, now - lastUpdate)
try {
// Update the resource counters only if there is some progress
- if (now > _timestamp) {
- logic.onUpdate(this, delta, _limit, reachedDeadline)
+ if (now > lastUpdate) {
+ logic.onUpdate(this, delta, _activeLimit, reachedDeadline)
}
val duration = consumer.onNext(this, now, delta)
+ val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration
// Reset update flags
_flag = 0
+ // Check whether the state has changed after [consumer.onNext]
when (_state) {
SimResourceState.Active -> {
- val target = logic.onConsume(this, now, _limit, duration)
-
- _speed = min(capacity, _limit)
- _duration = duration
- _deadline = target
+ logic.onConsume(this, now, _limit, duration)
- scheduleUpdate(now, target)
+ // Schedule an update at the new deadline
+ scheduleUpdate(now, newDeadline)
}
- SimResourceState.Pending ->
- if (oldState != SimResourceState.Pending) {
- throw IllegalStateException("Illegal transition to pending state")
- }
- SimResourceState.Stopped ->
- if (oldState != SimResourceState.Stopped) {
- doStop()
- }
+ SimResourceState.Stopped -> doStop()
+ SimResourceState.Pending -> throw IllegalStateException("Illegal transition to pending state")
}
+
+ // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value
+ val newLimit = _limit
+
+ // Flush the changes to the flow
+ _activeLimit = newLimit
+ _deadline = newDeadline
+ _rate = min(capacity, newLimit)
} catch (cause: Throwable) {
doFail(cause)
} finally {
- _timestamp = now
_updateActive = false
}
}
@@ -246,7 +253,7 @@ internal class SimResourceContextImpl(
}
}
- override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]"
+ override fun toString(): String = "SimResourceContextImpl[capacity=$capacity,rate=$_rate]"
/**
* Stop the resource context.
@@ -259,6 +266,7 @@ internal class SimResourceContextImpl(
doFail(cause)
} finally {
_deadline = Long.MAX_VALUE
+ _limit = 0.0
}
}