summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-resources
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-resources')
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt147
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt38
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt7
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt2
4 files changed, 90 insertions, 104 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index c7fa6a17..f4459c54 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -29,24 +29,6 @@ import java.time.Clock
*/
public abstract class SimAbstractResourceAggregator(private val clock: Clock) : SimResourceAggregator {
/**
- * The available resource provider contexts.
- */
- protected val inputContexts: Set<SimResourceContext>
- get() = _inputContexts
- private val _inputContexts = mutableSetOf<SimResourceContext>()
-
- /**
- * The output context.
- */
- protected val outputContext: SimResourceContext
- get() = context
-
- /**
- * The commands to submit to the underlying input resources.
- */
- protected val commands: MutableMap<SimResourceContext, SimResourceCommand> = mutableMapOf()
-
- /**
* This method is invoked when the resource consumer consumes resources.
*/
protected abstract fun doConsume(work: Double, limit: Double, deadline: Long)
@@ -54,37 +36,29 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
/**
* This method is invoked when the resource consumer enters an idle state.
*/
- protected open fun doIdle(deadline: Long) {
- for (input in inputContexts) {
- commands[input] = SimResourceCommand.Idle(deadline)
- }
- }
+ protected abstract fun doIdle(deadline: Long)
/**
* This method is invoked when the resource consumer finishes processing.
*/
- protected open fun doFinish(cause: Throwable?) {
- for (input in inputContexts) {
- commands[input] = SimResourceCommand.Exit
- }
- }
+ protected abstract fun doFinish(cause: Throwable?)
/**
* This method is invoked when an input context is started.
*/
- protected open fun onContextStarted(ctx: SimResourceContext) {
- _inputContexts.add(ctx)
- }
+ protected abstract fun onInputStarted(input: Input)
- protected open fun onContextFinished(ctx: SimResourceContext) {
- assert(_inputContexts.remove(ctx)) { "Lost context" }
- }
+ /**
+ * This method is invoked when an input is stopped.
+ */
+ protected abstract fun onInputFinished(input: Input)
override fun addInput(input: SimResourceProvider) {
check(output.state != SimResourceState.Stopped) { "Aggregator has been stopped" }
val consumer = Consumer()
_inputs.add(input)
+ _inputConsumers.add(consumer)
input.startConsumer(consumer)
}
@@ -99,15 +73,18 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
override val inputs: Set<SimResourceProvider>
get() = _inputs
private val _inputs = mutableSetOf<SimResourceProvider>()
+ private val _inputConsumers = mutableListOf<Consumer>()
- private val context = object : SimAbstractResourceContext(inputContexts.sumByDouble { it.capacity }, clock, _output) {
+ protected val outputContext: SimResourceContext
+ get() = context
+ private val context = object : SimAbstractResourceContext(0.0, clock, _output) {
override val remainingWork: Double
get() {
val now = clock.millis()
return if (_remainingWorkFlush < now) {
_remainingWorkFlush = now
- _inputContexts.sumByDouble { it.remainingWork }.also { _remainingWork = it }
+ _inputConsumers.sumByDouble { it._ctx?.remainingWork ?: 0.0 }.also { _remainingWork = it }
} else {
_remainingWork
}
@@ -115,12 +92,6 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
private var _remainingWork: Double = 0.0
private var _remainingWorkFlush: Long = Long.MIN_VALUE
- override fun interrupt() {
- super.interrupt()
-
- interruptAll()
- }
-
override fun onConsume(work: Double, limit: Double, deadline: Long) = doConsume(work, limit, deadline)
override fun onIdle(deadline: Long) = doIdle(deadline)
@@ -129,80 +100,80 @@ public abstract class SimAbstractResourceAggregator(private val clock: Clock) :
doFinish(cause)
super.onFinish(cause)
-
- interruptAll()
}
}
/**
- * A flag to indicate that an interrupt is active.
- */
- private var isInterrupting: Boolean = false
-
- /**
- * Schedule the work over the input resources.
+ * An input for the resource aggregator.
*/
- private fun doSchedule() {
- context.flush(isIntermediate = true)
- interruptAll()
+ public interface Input {
+ /**
+ * The [SimResourceContext] associated with the input.
+ */
+ public val ctx: SimResourceContext
+
+ /**
+ * Push the specified [SimResourceCommand] to the input.
+ */
+ public fun push(command: SimResourceCommand)
}
/**
- * Interrupt all inputs.
+ * An internal [SimResourceConsumer] implementation for aggregator inputs.
*/
- private fun interruptAll() {
- // Prevent users from interrupting the resource while they are constructing their next command, as this will
- // only lead to infinite recursion.
- if (isInterrupting) {
- return
+ private inner class Consumer : Input, SimResourceConsumer {
+ /**
+ * The resource context associated with the input.
+ */
+ override val ctx: SimResourceContext
+ get() = _ctx!!
+ var _ctx: SimResourceContext? = null
+
+ /**
+ * The resource command to run next.
+ */
+ private var command: SimResourceCommand? = null
+
+ /* Input */
+ override fun push(command: SimResourceCommand) {
+ this.command = command
+ _ctx?.interrupt()
}
- try {
- isInterrupting = true
-
- val iterator = _inputs.iterator()
- while (iterator.hasNext()) {
- val input = iterator.next()
- input.interrupt()
-
- if (input.state != SimResourceState.Active) {
- iterator.remove()
- }
- }
- } finally {
- isInterrupting = false
- }
- }
-
- /**
- * An internal [SimResourceConsumer] implementation for aggregator inputs.
- */
- private inner class Consumer : SimResourceConsumer {
+ /* SimResourceConsumer */
override fun onStart(ctx: SimResourceContext) {
- onContextStarted(ctx)
+ _ctx = ctx
onCapacityChanged(ctx, false)
// Make sure we initialize the output if we have not done so yet
if (context.state == SimResourceState.Pending) {
context.start()
}
+
+ onInputStarted(this)
}
override fun onNext(ctx: SimResourceContext): SimResourceCommand {
- doSchedule()
-
- return commands[ctx] ?: SimResourceCommand.Idle()
+ var next = command
+
+ return if (next != null) {
+ this.command = null
+ next
+ } else {
+ context.flush(isIntermediate = true)
+ next = command
+ this.command = null
+ next ?: SimResourceCommand.Idle()
+ }
}
override fun onCapacityChanged(ctx: SimResourceContext, isThrottled: Boolean) {
// Adjust capacity of output resource
- context.capacity = inputContexts.sumByDouble { it.capacity }
+ context.capacity = _inputConsumers.sumByDouble { it._ctx?.capacity ?: 0.0 }
}
override fun onFinish(ctx: SimResourceContext, cause: Throwable?) {
- onContextFinished(ctx)
-
- super.onFinish(ctx, cause)
+ onInputFinished(this)
}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index 08bc064e..5d550ad8 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -28,36 +28,46 @@ import java.time.Clock
* A [SimResourceAggregator] that distributes the load equally across the input resources.
*/
public class SimResourceAggregatorMaxMin(clock: Clock) : SimAbstractResourceAggregator(clock) {
- private val consumers = mutableListOf<SimResourceContext>()
+ private val consumers = mutableListOf<Input>()
override fun doConsume(work: Double, limit: Double, deadline: Long) {
// Sort all consumers by their capacity
- consumers.sortWith(compareBy { it.capacity })
+ consumers.sortWith(compareBy { it.ctx.capacity })
// Divide the requests over the available capacity of the input resources fairly
for (input in consumers) {
- val inputCapacity = input.capacity
+ val inputCapacity = input.ctx.capacity
val fraction = inputCapacity / outputContext.capacity
val grantedSpeed = limit * fraction
val grantedWork = fraction * work
- commands[input] =
- if (grantedWork > 0.0 && grantedSpeed > 0.0)
- SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
- else
- SimResourceCommand.Idle(deadline)
+ val command = if (grantedWork > 0.0 && grantedSpeed > 0.0)
+ SimResourceCommand.Consume(grantedWork, grantedSpeed, deadline)
+ else
+ SimResourceCommand.Idle(deadline)
+ input.push(command)
}
}
- override fun onContextStarted(ctx: SimResourceContext) {
- super.onContextStarted(ctx)
+ override fun doIdle(deadline: Long) {
+ for (input in consumers) {
+ input.push(SimResourceCommand.Idle(deadline))
+ }
+ }
- consumers.add(ctx)
+ override fun doFinish(cause: Throwable?) {
+ val iterator = consumers.iterator()
+ for (input in iterator) {
+ iterator.remove()
+ input.push(SimResourceCommand.Exit)
+ }
}
- override fun onContextFinished(ctx: SimResourceContext) {
- super.onContextFinished(ctx)
+ override fun onInputStarted(input: Input) {
+ consumers.add(input)
+ }
- consumers.remove(ctx)
+ override fun onInputFinished(input: Input) {
+ consumers.remove(input)
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
index 025b0406..157db3cb 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceSource.kt
@@ -112,7 +112,12 @@ public class SimResourceSource(
override fun onFinish(cause: Throwable?) {
scheduler.cancel(this)
- cancel()
+
+ ctx = null
+
+ if (this@SimResourceSource.state != SimResourceState.Stopped) {
+ this@SimResourceSource.state = SimResourceState.Pending
+ }
super.onFinish(cause)
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
index e272abb8..e78bcdac 100644
--- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMinTest.kt
@@ -139,7 +139,7 @@ internal class SimResourceAggregatorMaxMinTest {
val consumer = mockk<SimResourceConsumer>(relaxUnitFun = true)
every { consumer.onNext(any()) }
.returns(SimResourceCommand.Consume(1.0, 1.0))
- .andThenThrows(IllegalStateException())
+ .andThenThrows(IllegalStateException("Test Exception"))
try {
assertThrows<IllegalStateException> { aggregator.output.consume(consumer) }