summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-28 11:58:19 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-03 17:17:37 +0200
commit02fa44c0b116ff51c4cbe2876d8b2a225ed68553 (patch)
tree38562ef2e6f3cde4e46ad5eb32d7573cebaabef6 /opendc-simulator/opendc-simulator-resources/src/main
parentd575bed5418be222e1d3ad39af862e2390596d61 (diff)
refactor(simulator): Add support for pushing flow from context
This change adds a new method to `SimResourceContext` called `push` which allows users to change the requested flow rate directly without having to interrupt the consumer.
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt53
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt8
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt46
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt14
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt18
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt109
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt3
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt9
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt11
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt123
12 files changed, 195 insertions, 213 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index da5c3257..8e0eb5f8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -125,16 +125,21 @@ public abstract class SimAbstractResourceAggregator(
/**
* An input for the resource aggregator.
*/
- public interface Input {
+ public interface Input : AutoCloseable {
/**
* The [SimResourceContext] associated with the input.
*/
public val ctx: SimResourceContext
/**
- * Push the specified [SimResourceCommand] to the input.
+ * Push to this input with the specified [limit] and [duration].
*/
- public fun push(command: SimResourceCommand)
+ public fun push(limit: Double, duration: Long)
+
+ /**
+ * Close the input for further input.
+ */
+ public override fun close()
}
/**
@@ -151,7 +156,12 @@ public abstract class SimAbstractResourceAggregator(
/**
* The resource command to run next.
*/
- private var command: SimResourceCommand? = null
+ private var _duration: Long = Long.MAX_VALUE
+
+ /**
+ * A flag to indicate that the consumer should flush.
+ */
+ private var _isPushed = false
private fun updateCapacity() {
// Adjust capacity of output resource
@@ -159,25 +169,34 @@ public abstract class SimAbstractResourceAggregator(
}
/* Input */
- override fun push(command: SimResourceCommand) {
- this.command = command
- _ctx?.interrupt()
+ override fun push(limit: Double, duration: Long) {
+ _duration = duration
+ val ctx = _ctx
+ if (ctx != null) {
+ ctx.push(limit)
+ ctx.interrupt()
+ }
+ _isPushed = true
+ }
+
+ override fun close() {
+ _duration = Long.MAX_VALUE
+ _isPushed = true
+ _ctx?.close()
}
/* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
- var next = command
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
+ var next = _duration
- return if (next != null) {
- this.command = null
- next
- } else {
+ if (!_isPushed) {
_output.flush()
-
- next = command
- this.command = null
- next ?: SimResourceCommand.Consume(0.0)
+ next = _duration
}
+
+ _isPushed = false
+ _duration = Long.MAX_VALUE
+ return next
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 537be1b5..b258a368 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -41,11 +41,7 @@ public class SimResourceAggregatorMaxMin(
val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
- val command = if (grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedSpeed, duration)
- else
- SimResourceCommand.Consume(0.0, duration)
- input.push(command)
+ input.push(grantedSpeed, duration)
}
}
@@ -53,7 +49,7 @@ public class SimResourceAggregatorMaxMin(
val iterator = consumers.iterator()
for (input in iterator) {
iterator.remove()
- input.push(SimResourceCommand.Exit)
+ input.close()
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
deleted file mode 100644
index 4a980071..00000000
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceCommand.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.resources
-
-/**
- * A SimResourceCommand communicates to a resource how it is consumed by a [SimResourceConsumer].
- */
-public sealed class SimResourceCommand {
- /**
- * A request to the resource to perform work for the specified [duration].
- *
- * @param limit The maximum amount of work to be processed per second.
- * @param duration The duration of the resource consumption in milliseconds.
- */
- public data class Consume(val limit: Double, val duration: Long = Long.MAX_VALUE) : SimResourceCommand() {
- init {
- require(limit >= 0.0) { "Negative limit is not allowed" }
- require(duration >= 0) { "Duration must be positive" }
- }
- }
-
- /**
- * An indication to the resource that the consumer has finished.
- */
- public object Exit : SimResourceCommand()
-}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
index 4d1d2c32..0b25358a 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceConsumer.kt
@@ -30,15 +30,14 @@ package org.opendc.simulator.resources
*/
public interface SimResourceConsumer {
/**
- * This method is invoked when a resource asks for the next [command][SimResourceCommand] to process, either because
- * the resource finished processing, reached its deadline or was interrupted.
+ * This method is invoked when the resource provider is pulling this resource consumer.
*
* @param ctx The execution context in which the consumer runs.
* @param now The virtual timestamp in milliseconds at which the update is occurring.
* @param delta The virtual duration between this call and the last call in milliseconds.
- * @return The next command that the resource should execute.
+ * @return The duration after which the resource consumer should be pulled again.
*/
- public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand
+ public fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long
/**
* This method is invoked when an event has occurred.
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
index f28b43d0..225cae0b 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceContext.kt
@@ -28,7 +28,7 @@ import java.time.Clock
* The execution context in which a [SimResourceConsumer] runs. It facilitates the communication and control between a
* resource and a resource consumer.
*/
-public interface SimResourceContext {
+public interface SimResourceContext : AutoCloseable {
/**
* The virtual clock tracking simulation time.
*/
@@ -53,4 +53,16 @@ public interface SimResourceContext {
* Ask the resource provider to interrupt its resource.
*/
public fun interrupt()
+
+ /**
+ * Push the given flow to this context.
+ *
+ * @param rate The rate of the flow to push.
+ */
+ public fun push(rate: Double)
+
+ /**
+ * Stop the resource context.
+ */
+ public override fun close()
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
index ceaca39a..ba52b597 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceControllableContext.kt
@@ -27,7 +27,7 @@ package org.opendc.simulator.resources
*
* This interface is used by resource providers to control the resource context.
*/
-public interface SimResourceControllableContext : SimResourceContext, AutoCloseable {
+public interface SimResourceControllableContext : SimResourceContext {
/**
* The state of the resource context.
*/
@@ -44,11 +44,6 @@ public interface SimResourceControllableContext : SimResourceContext, AutoClosea
public fun start()
/**
- * Stop the resource context.
- */
- public override fun close()
-
- /**
* Invalidate the resource context's state.
*
* By invalidating the resource context's current state, the state is re-computed and the current progress is
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
index d23c7dbb..eac58410 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceDistributorMaxMin.kt
@@ -54,11 +54,6 @@ public class SimResourceDistributorMaxMin(
private val activeOutputs: MutableList<Output> = mutableListOf()
/**
- * The total amount of work allocated to be executed.
- */
- private var totalAllocatedWork = 0.0
-
- /**
* The total allocated speed for the output resources.
*/
private var totalAllocatedSpeed = 0.0
@@ -97,7 +92,7 @@ public class SimResourceDistributorMaxMin(
}
/* SimResourceConsumer */
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
return doNext(ctx, now)
}
@@ -137,10 +132,10 @@ public class SimResourceDistributorMaxMin(
/**
* Schedule the work of the outputs.
*/
- private fun doNext(ctx: SimResourceContext, now: Long): SimResourceCommand {
+ private fun doNext(ctx: SimResourceContext, now: Long): Long {
// If there is no work yet, mark the input as idle.
if (activeOutputs.isEmpty()) {
- return SimResourceCommand.Consume(0.0)
+ return Long.MAX_VALUE
}
val capacity = ctx.capacity
@@ -196,14 +191,11 @@ public class SimResourceDistributorMaxMin(
}
this.totalRequestedSpeed = totalRequestedSpeed
- this.totalAllocatedWork = totalAllocatedWork
val totalAllocatedSpeed = capacity - availableSpeed
this.totalAllocatedSpeed = totalAllocatedSpeed
- return if (totalAllocatedWork > 0.0 && totalAllocatedSpeed > 0.0)
- SimResourceCommand.Consume(totalAllocatedSpeed, duration)
- else
- SimResourceCommand.Consume(0.0, duration)
+ ctx.push(totalAllocatedSpeed)
+ return duration
}
private fun updateCapacity(ctx: SimResourceContext) {
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
index 68bedbd9..f12ef9f1 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceTransformer.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.resources
import org.opendc.simulator.resources.impl.SimResourceCountersImpl
+import java.time.Clock
/**
* A [SimResourceFlow] that transforms the resource commands emitted by the resource commands to the resource provider.
@@ -32,14 +33,9 @@ import org.opendc.simulator.resources.impl.SimResourceCountersImpl
*/
public class SimResourceTransformer(
private val isCoupled: Boolean = false,
- private val transform: (SimResourceContext, SimResourceCommand) -> SimResourceCommand
+ private val transform: (SimResourceContext, Long) -> Long
) : SimResourceFlow, AutoCloseable {
/**
- * The [SimResourceContext] in which the forwarder runs.
- */
- private var ctx: SimResourceContext? = null
-
- /**
* The delegate [SimResourceConsumer].
*/
private var delegate: SimResourceConsumer? = null
@@ -49,17 +45,63 @@ public class SimResourceTransformer(
*/
private var hasDelegateStarted: Boolean = false
+ /**
+ * The exposed [SimResourceContext].
+ */
+ private val ctx = object : SimResourceContext {
+ override val clock: Clock
+ get() = _ctx!!.clock
+
+ override val capacity: Double
+ get() = _ctx?.capacity ?: 0.0
+
+ override val demand: Double
+ get() = _ctx?.demand ?: 0.0
+
+ override val speed: Double
+ get() = _ctx?.speed ?: 0.0
+
+ override fun interrupt() {
+ _ctx?.interrupt()
+ }
+
+ override fun push(rate: Double) {
+ _ctx?.push(rate)
+ _limit = rate
+ }
+
+ override fun close() {
+ val delegate = checkNotNull(delegate) { "Delegate not active" }
+
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onEvent(this, SimResourceEvent.Exit)
+
+ if (isCoupled)
+ _ctx?.close()
+ else
+ _ctx?.push(0.0)
+ }
+ }
+
+ /**
+ * The [SimResourceContext] in which the forwarder runs.
+ */
+ private var _ctx: SimResourceContext? = null
+
override val isActive: Boolean
get() = delegate != null
override val capacity: Double
- get() = ctx?.capacity ?: 0.0
+ get() = ctx.capacity
override val speed: Double
- get() = ctx?.speed ?: 0.0
+ get() = ctx.speed
override val demand: Double
- get() = ctx?.demand ?: 0.0
+ get() = ctx.demand
override val counters: SimResourceCounters
get() = _counters
@@ -75,32 +117,32 @@ public class SimResourceTransformer(
}
override fun interrupt() {
- ctx?.interrupt()
+ ctx.interrupt()
}
override fun cancel() {
val delegate = delegate
- val ctx = ctx
+ val ctx = _ctx
if (delegate != null) {
this.delegate = null
if (ctx != null) {
- delegate.onEvent(ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this.ctx, SimResourceEvent.Exit)
}
}
}
override fun close() {
- val ctx = ctx
+ val ctx = _ctx
if (ctx != null) {
- this.ctx = null
+ this._ctx = null
ctx.interrupt()
}
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val delegate = delegate
if (!hasDelegateStarted) {
@@ -110,54 +152,39 @@ public class SimResourceTransformer(
updateCounters(ctx, delta)
return if (delegate != null) {
- val command = transform(ctx, delegate.onNext(ctx, now, delta))
-
- _limit = if (command is SimResourceCommand.Consume) command.limit else 0.0
-
- if (command == SimResourceCommand.Exit) {
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
-
- delegate.onEvent(ctx, SimResourceEvent.Exit)
-
- if (isCoupled)
- SimResourceCommand.Exit
- else
- onNext(ctx, now, delta)
- } else {
- command
- }
+ val duration = transform(ctx, delegate.onNext(this.ctx, now, delta))
+ _limit = ctx.demand
+ duration
} else {
- SimResourceCommand.Consume(0.0)
+ Long.MAX_VALUE
}
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
when (event) {
SimResourceEvent.Start -> {
- this.ctx = ctx
+ _ctx = ctx
}
SimResourceEvent.Exit -> {
- this.ctx = null
+ _ctx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onEvent(ctx, SimResourceEvent.Exit)
+ delegate.onEvent(this.ctx, SimResourceEvent.Exit)
}
}
- else -> delegate?.onEvent(ctx, event)
+ else -> delegate?.onEvent(this.ctx, event)
}
}
override fun onFailure(ctx: SimResourceContext, cause: Throwable) {
- this.ctx = null
+ _ctx = null
val delegate = delegate
if (delegate != null) {
reset()
- delegate.onFailure(ctx, cause)
+ delegate.onFailure(this.ctx, cause)
}
}
@@ -166,7 +193,7 @@ public class SimResourceTransformer(
*/
private fun start() {
val delegate = delegate ?: return
- delegate.onEvent(checkNotNull(ctx), SimResourceEvent.Start)
+ delegate.onEvent(checkNotNull(_ctx), SimResourceEvent.Start)
hasDelegateStarted = true
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
index 1f8434b7..46885640 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimSpeedConsumerAdapter.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import org.opendc.simulator.resources.SimResourceEvent
@@ -50,7 +49,7 @@ public class SimSpeedConsumerAdapter(
callback(0.0)
}
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
return delegate.onNext(ctx, now, delta)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
index e5173e5f..ad6b0108 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimTraceConsumer.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import org.opendc.simulator.resources.SimResourceEvent
@@ -34,13 +33,15 @@ import org.opendc.simulator.resources.SimResourceEvent
public class SimTraceConsumer(private val trace: Sequence<Fragment>) : SimResourceConsumer {
private var iterator: Iterator<Fragment>? = null
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val iterator = checkNotNull(iterator)
return if (iterator.hasNext()) {
val fragment = iterator.next()
- SimResourceCommand.Consume(fragment.usage, fragment.duration)
+ ctx.push(fragment.usage)
+ fragment.duration
} else {
- SimResourceCommand.Exit
+ ctx.close()
+ Long.MAX_VALUE
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
index ae837043..bf76711f 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/consumer/SimWorkConsumer.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.resources.consumer
-import org.opendc.simulator.resources.SimResourceCommand
import org.opendc.simulator.resources.SimResourceConsumer
import org.opendc.simulator.resources.SimResourceContext
import kotlin.math.roundToLong
@@ -37,12 +36,12 @@ public class SimWorkConsumer(
init {
require(work >= 0.0) { "Work must be positive" }
- require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" }
+ require(utilization > 0.0) { "Utilization must be positive" }
}
private var remainingWork = work
- override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): SimResourceCommand {
+ override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
val actualWork = ctx.speed * delta / 1000.0
val limit = ctx.capacity * utilization
@@ -52,9 +51,11 @@ public class SimWorkConsumer(
val duration = (remainingWork / limit * 1000).roundToLong()
return if (duration > 0) {
- SimResourceCommand.Consume(limit, duration)
+ ctx.push(limit)
+ duration
} else {
- SimResourceCommand.Exit
+ ctx.close()
+ Long.MAX_VALUE
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
index a9507e52..d7ea0043 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt
@@ -86,6 +86,11 @@ internal class SimResourceContextImpl(
private var _deadline: Long = Long.MAX_VALUE
/**
+ * A flag to indicate that an update is active.
+ */
+ private var _updateActive = false
+
+ /**
* The update flag indicating why the update was triggered.
*/
private var _flag: Int = 0
@@ -108,7 +113,9 @@ internal class SimResourceContextImpl(
if (_state != SimResourceState.Stopped) {
interpreter.batch {
_state = SimResourceState.Stopped
- doStop()
+ if (!_updateActive) {
+ doStop()
+ }
}
}
}
@@ -139,6 +146,11 @@ internal class SimResourceContextImpl(
interpreter.scheduleSync(this)
}
+ override fun push(rate: Double) {
+ _speed = min(capacity, rate)
+ _limit = rate
+ }
+
/**
* Determine whether the state of the resource context should be updated.
*/
@@ -151,14 +163,49 @@ internal class SimResourceContextImpl(
* Update the state of the resource context.
*/
fun doUpdate(timestamp: Long) {
+ val oldState = _state
+ if (oldState != SimResourceState.Active) {
+ return
+ }
+
+ _updateActive = true
+
+ val flag = _flag
+ val isInterrupted = flag and FLAG_INTERRUPT != 0
+ val reachedDeadline = _deadline <= timestamp
+ val delta = max(0, timestamp - _timestamp)
+
try {
- val oldState = _state
- val newState = doUpdate(timestamp, oldState)
- _state = newState
+ // Update the resource counters only if there is some progress
+ if (timestamp > _timestamp) {
+ logic.onUpdate(this, delta, _limit, reachedDeadline)
+ }
+
+ // We should only continue processing the next command if:
+ // 1. The resource consumption was finished.
+ // 2. The resource consumer should be interrupted (e.g., someone called .interrupt())
+ val duration = if (reachedDeadline || isInterrupted) {
+ consumer.onNext(this, timestamp, delta)
+ } else {
+ _deadline - timestamp
+ }
+
+ // Reset update flags
_flag = 0
- when (newState) {
+ when (_state) {
+ SimResourceState.Active -> {
+ val limit = _limit
+ push(limit)
+ _duration = duration
+
+ val target = logic.onConsume(this, timestamp, limit, duration)
+
+ _deadline = target
+
+ scheduleUpdate(target)
+ }
SimResourceState.Pending ->
if (oldState != SimResourceState.Pending) {
throw IllegalStateException("Illegal transition to pending state")
@@ -167,12 +214,12 @@ internal class SimResourceContextImpl(
if (oldState != SimResourceState.Stopped) {
doStop()
}
- else -> {}
}
} catch (cause: Throwable) {
doFail(cause)
} finally {
_timestamp = timestamp
+ _updateActive = false
}
}
@@ -185,39 +232,6 @@ internal class SimResourceContextImpl(
override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]"
/**
- * Update the state of the resource context.
- */
- private fun doUpdate(timestamp: Long, state: SimResourceState): SimResourceState {
- return when (state) {
- // Resource context is not active, so its state will not update
- SimResourceState.Pending, SimResourceState.Stopped -> state
- SimResourceState.Active -> {
- val isInterrupted = _flag and FLAG_INTERRUPT != 0
- val reachedDeadline = _deadline <= timestamp
- val delta = max(0, timestamp - _timestamp)
-
- // Update the resource counters only if there is some progress
- if (timestamp > _timestamp) {
- logic.onUpdate(this, delta, _limit, reachedDeadline)
- }
-
- // We should only continue processing the next command if:
- // 1. The resource consumption was finished.
- // 2. The resource capacity cannot satisfy the demand.
- // 3. The resource consumer should be interrupted (e.g., someone called .interrupt())
- if (reachedDeadline || isInterrupted) {
- when (val command = consumer.onNext(this, timestamp, delta)) {
- is SimResourceCommand.Consume -> interpretConsume(timestamp, command.limit, command.duration)
- is SimResourceCommand.Exit -> interpretExit()
- }
- } else {
- interpretConsume(timestamp, _limit, _duration - delta)
- }
- }
- }
- }
-
- /**
* Stop the resource context.
*/
private fun doStop() {
@@ -226,6 +240,8 @@ internal class SimResourceContextImpl(
logic.onFinish(this)
} catch (cause: Throwable) {
doFail(cause)
+ } finally {
+ _deadline = Long.MAX_VALUE
}
}
@@ -244,35 +260,6 @@ internal class SimResourceContextImpl(
}
/**
- * Interpret the [SimResourceCommand.Consume] command.
- */
- private fun interpretConsume(now: Long, limit: Double, duration: Long): SimResourceState {
- _speed = min(capacity, limit)
- _limit = limit
- _duration = duration
-
- val timestamp = logic.onConsume(this, now, limit, duration)
-
- _deadline = timestamp
-
- scheduleUpdate(timestamp)
-
- return SimResourceState.Active
- }
-
- /**
- * Interpret the [SimResourceCommand.Exit] command.
- */
- private fun interpretExit(): SimResourceState {
- _speed = 0.0
- _limit = 0.0
- _duration = Long.MAX_VALUE
- _deadline = Long.MAX_VALUE
-
- return SimResourceState.Stopped
- }
-
- /**
* Indicate that the capacity of the resource has changed.
*/
private fun onCapacityChange() {