summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt81
-rw-r--r--opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt15
2 files changed, 16 insertions, 80 deletions
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
index 8e0eb5f8..d064d7fa 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimAbstractResourceAggregator.kt
@@ -32,7 +32,7 @@ public abstract class SimAbstractResourceAggregator(
/**
* This method is invoked when the resource consumer consumes resources.
*/
- protected abstract fun doConsume(limit: Double, duration: Long)
+ protected abstract fun doConsume(limit: Double)
/**
* This method is invoked when the resource consumer finishes processing.
@@ -42,12 +42,12 @@ public abstract class SimAbstractResourceAggregator(
/**
* This method is invoked when an input context is started.
*/
- protected abstract fun onInputStarted(input: Input)
+ protected abstract fun onInputStarted(input: SimResourceContext)
/**
* This method is invoked when an input is stopped.
*/
- protected abstract fun onInputFinished(input: Input)
+ protected abstract fun onInputFinished(input: SimResourceContext)
/* SimResourceAggregator */
override fun addInput(input: SimResourceProvider) {
@@ -95,7 +95,7 @@ public abstract class SimAbstractResourceAggregator(
return object : SimResourceProviderLogic {
override fun onConsume(ctx: SimResourceControllableContext, now: Long, limit: Double, duration: Long): Long {
- doConsume(limit, duration)
+ doConsume(limit)
return super.onConsume(ctx, now, limit, duration)
}
@@ -113,90 +113,25 @@ public abstract class SimAbstractResourceAggregator(
}
}
}
-
- /**
- * Flush the progress of the output if possible.
- */
- fun flush() {
- ctx?.flush()
- }
- }
-
- /**
- * An input for the resource aggregator.
- */
- public interface Input : AutoCloseable {
- /**
- * The [SimResourceContext] associated with the input.
- */
- public val ctx: SimResourceContext
-
- /**
- * Push to this input with the specified [limit] and [duration].
- */
- public fun push(limit: Double, duration: Long)
-
- /**
- * Close the input for further input.
- */
- public override fun close()
}
/**
* An internal [SimResourceConsumer] implementation for aggregator inputs.
*/
- private inner class Consumer : Input, SimResourceConsumer {
+ private inner class Consumer : SimResourceConsumer {
/**
* The resource context associated with the input.
*/
- override val ctx: SimResourceContext
- get() = _ctx!!
private var _ctx: SimResourceContext? = null
- /**
- * The resource command to run next.
- */
- private var _duration: Long = Long.MAX_VALUE
-
- /**
- * A flag to indicate that the consumer should flush.
- */
- private var _isPushed = false
-
private fun updateCapacity() {
// Adjust capacity of output resource
_output.capacity = _inputConsumers.sumOf { it._ctx?.capacity ?: 0.0 }
}
- /* Input */
- override fun push(limit: Double, duration: Long) {
- _duration = duration
- val ctx = _ctx
- if (ctx != null) {
- ctx.push(limit)
- ctx.interrupt()
- }
- _isPushed = true
- }
-
- override fun close() {
- _duration = Long.MAX_VALUE
- _isPushed = true
- _ctx?.close()
- }
-
/* SimResourceConsumer */
override fun onNext(ctx: SimResourceContext, now: Long, delta: Long): Long {
- var next = _duration
-
- if (!_isPushed) {
- _output.flush()
- next = _duration
- }
-
- _isPushed = false
- _duration = Long.MAX_VALUE
- return next
+ return Long.MAX_VALUE
}
override fun onEvent(ctx: SimResourceContext, event: SimResourceEvent) {
@@ -205,10 +140,10 @@ public abstract class SimAbstractResourceAggregator(
_ctx = ctx
updateCapacity()
- onInputStarted(this)
+ onInputStarted(ctx)
}
SimResourceEvent.Capacity -> updateCapacity()
- SimResourceEvent.Exit -> onInputFinished(this)
+ SimResourceEvent.Exit -> onInputFinished(ctx)
else -> {}
}
}
diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
index b258a368..f131ac6c 100644
--- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
+++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/SimResourceAggregatorMaxMin.kt
@@ -29,19 +29,20 @@ public class SimResourceAggregatorMaxMin(
interpreter: SimResourceInterpreter,
parent: SimResourceSystem? = null
) : SimAbstractResourceAggregator(interpreter, parent) {
- private val consumers = mutableListOf<Input>()
+ private val consumers = mutableListOf<SimResourceContext>()
- override fun doConsume(limit: Double, duration: Long) {
+ override fun doConsume(limit: Double) {
// Sort all consumers by their capacity
- consumers.sortWith(compareBy { it.ctx.capacity })
+ consumers.sortWith(compareBy { it.capacity })
// Divide the requests over the available capacity of the input resources fairly
for (input in consumers) {
- val inputCapacity = input.ctx.capacity
+ val inputCapacity = input.capacity
val fraction = inputCapacity / capacity
val grantedSpeed = limit * fraction
- input.push(grantedSpeed, duration)
+ input.push(grantedSpeed)
+ input.interrupt()
}
}
@@ -53,11 +54,11 @@ public class SimResourceAggregatorMaxMin(
}
}
- override fun onInputStarted(input: Input) {
+ override fun onInputStarted(input: SimResourceContext) {
consumers.add(input)
}
- override fun onInputFinished(input: Input) {
+ override fun onInputFinished(input: SimResourceContext) {
consumers.remove(input)
}
}