summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-resources/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-resources/src/main')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt13
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt10
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt14
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt47
7 files changed, 62 insertions, 32 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index e5991264..c7fa6a17 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -102,7 +102,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
override val remainingWork: Double
- get() = inputContexts.sumByDouble { it.remainingWork }
+ get() {
+ val now = clock.millis()
+
+ return if (_remainingWorkFlush < now) {
+ _remainingWorkFlush = now
+ _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it }
+ } else {
+ _remainingWork
+ }
+ }
+ private var _remainingWork: Double = 0.0
+ private var _remainingWorkFlush: Long = Long.MIN_VALUE
override fun interrupt() {
super.interrupt()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
index 5c5ee038..05ed0714 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceContext.kt
@@ -272,6 +272,7 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = 0.0
+ consumer.onConfirm(this, 0.0)
onIdle(deadline)
}
@@ -283,6 +284,7 @@ public abstract class SimAbstractResourceContext(
require(deadline >= now) { "Deadline already passed" }
speed = min(capacity, limit)
+ consumer.onConfirm(this, speed)
onConsume(work, limit, deadline)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 672a3e9d..38672b13 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.resources
/**
- * A [SimResourceConsumer] characterizes how a [SimResource] is consumed.
+ * A [SimResourceConsumer] characterizes how a resource is consumed.
*
* Implementors of this interface should be considered stateful and must be assumed not to be re-usable (concurrently)
* for multiple resource providers, unless explicitly said otherwise.
@@ -46,6 +46,14 @@ public interface SimResourceConsumer {
public fun onNext(ctx: SimResourceContext): SimResourceCommand
/**
+ * This method is invoked when the resource provider confirms that the consumer is running at the given speed.
+ *
+ * @param ctx The execution context in which the consumer runs.
+ * @param speed The speed at which the consumer runs.
+ */
+ public fun onConfirm(ctx: SimResourceContext, speed: Double) {}
+
+ /**
* This is method is invoked when the capacity of the resource changes.
*
* After being informed of such an event, the consumer might decide to adjust its consumption by interrupting the
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index 9df333e3..dfdd2c2e 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -106,7 +106,7 @@ public class SimResourceDistributorMaxMin(
val output = iterator.next()
// Remove the output from the outputs to prevent ConcurrentModificationException when removing it
- // during the call tou output.close()
+ // during the call to output.close()
iterator.remove()
output.close()
@@ -251,8 +251,8 @@ public class SimResourceDistributorMaxMin(
totalAllocatedWork - totalRemainingWork,
totalOvercommittedWork.toLong(),
totalInterferedWork.toLong(),
- totalRequestedSpeed,
totalAllocatedSpeed,
+ totalRequestedSpeed
)
totalInterferedWork = 0.0
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 9b10edaf..025b0406 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
import org.opendc.utils.TimerScheduler
import java.time.Clock
import kotlin.math.ceil
@@ -42,11 +40,10 @@ public class SimResourceSource(
private val scheduler: TimerScheduler<Any>
) : SimResourceProvider {
/**
- * The resource processing speed over time.
+ * The current processing speed of the resource.
*/
- public val speed: StateFlow<Double>
- get() = _speed
- private val _speed = MutableStateFlow(0.0)
+ public val speed: Double
+ get() = ctx?.speed ?: 0.0
/**
* The capacity of the resource.
@@ -101,8 +98,6 @@ public class SimResourceSource(
*/
private inner class Context(consumer: SimResourceConsumer) : SimAbstractResourceContext(capacity, clock, consumer) {
override fun onIdle(deadline: Long) {
- _speed.value = speed
-
// Do not resume if deadline is "infinite"
if (deadline != Long.MAX_VALUE) {
scheduler.startSingleTimerTo(this, deadline) { flush() }
@@ -110,15 +105,12 @@ public class SimResourceSource(
}
override fun onConsume(work: Double, limit: Double, deadline: Long) {
- _speed.value = speed
-
val until = min(deadline, clock.millis() + getDuration(work, speed))
scheduler.startSingleTimerTo(this, until, ::flush)
}
override fun onFinish(cause: Throwable?) {
- _speed.value = speed
scheduler.cancel(this)
cancel()
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 73f18c7c..de455021 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -124,6 +124,10 @@ public class SimResourceTransformer(
}
}
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate?.onConfirm(ctx, speed)
+ }
+
override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
delegate?.onCapacityChanged(ctx, isThrottled)
}
diff --git a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index fd4a9ed5..114c7312 100644
--- a/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/simulator/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -22,8 +22,6 @@
package org.opendc.simulator.resources.consumer
-import kotlinx.coroutines.flow.MutableStateFlow
-import kotlinx.coroutines.flow.StateFlow
import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
@@ -32,37 +30,52 @@ import kotlin.math.min
/**
* Helper class to expose an observable [speed] field describing the speed of the consumer.
*/
-public class SimSpeedConsumerAdapter(private val delegate: SimResourceConsumer) : SimResourceConsumer by delegate {
+public class SimSpeedConsumerAdapter(
+ private val delegate: SimResourceConsumer,
+ private val callback: (Double) -> Unit = {}
+) : SimResourceConsumer by delegate {
/**
- * The resource processing speed over time.
+ * The resource processing speed at this instant.
*/
- public val speed: StateFlow<Double>
- get() = _speed
- private val _speed = MutableStateFlow(0.0)
+ public var speed: Double = 0.0
+ private set(value) {
+ if (field != value) {
+ callback(value)
+ field = value
+ }
+ }
+
+ init {
+ callback(0.0)
+ }
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- val command = delegate.onNext(ctx)
+ return delegate.onNext(ctx)
+ }
- when (command) {
- is SimResourceCommand.Idle -> _speed.value = 0.0
- is SimResourceCommand.Consume -> _speed.value = min(ctx.capacity, command.limit)
- is SimResourceCommand.Exit -> _speed.value = 0.0
- }
+ override fun onConfirm(ctx: SimResourceContext, speed: Double) {
+ delegate.onConfirm(ctx, speed)
- return command
+ this.speed = speed
}
override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
- val oldSpeed = _speed.value
+ val oldSpeed = speed
delegate.onCapacityChanged(ctx, isThrottled)
// Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
// need to update the current speed.
- if (oldSpeed == _speed.value) {
- _speed.value = min(ctx.capacity, _speed.value)
+ if (oldSpeed == speed) {
+ speed = min(ctx.capacity, speed)
}
}
+ override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
+ super.onFinish(ctx, cause)
+
+ speed = 0.0
+ }
+
override fun toString(): String = "SimSpeedConsumerAdapter[delegate=$delegate]"
}