summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-08 22:02:07 +0200
committerGitHub <noreply@github.com>2021-10-08 22:02:07 +0200
commitb7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (patch)
treeb88bbede15108c6855d7f94ded4c7054df186a72 /opendc-simulator/opendc-simulator-flow/src
parent3098eeb116a80ce12e6575e454d0448867478792 (diff)
parente2f002358e9d5be2239fa2cb7ca92c9c96a21b6f (diff)
merge: Performance improvements for flow engine
This pull request applies multiple performance improvements for the flow engine and compute simulator. * Optimize SimTraceWorkload (by storing fragments using several arrays) * Skip fair-share algorithm if capacity remaining * Count interference for multiplexer inputs * Simplify FlowSink implementation * Do not update outputs if rate is unchanged * Eliminate ArrayList iteration overhead * Optimize clock storage * Specialize FlowEngine queues * Eliminate clock access in hot path **Breaking API Changes** * `SimTraceWorkload` now accepts a `SimTrace` as parameter. This trace can be constructed using fragments or directly using builder class. Internally, the trace is now stored using several arrays.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt147
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt7
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt34
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt116
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt54
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt195
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt)28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt263
12 files changed, 702 insertions, 321 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
deleted file mode 100644
index b02426e3..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.FlowCountersImpl
-
-/**
- * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations.
- */
-public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer {
- /**
- * A flag to indicate that the flow consumer is active.
- */
- public override val isActive: Boolean
- get() = ctx != null
-
- /**
- * The capacity of the consumer.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- ctx?.capacity = value
- }
-
- /**
- * The current processing rate of the consumer.
- */
- public override val rate: Double
- get() = ctx?.rate ?: 0.0
-
- /**
- * The flow processing rate demand at this instant.
- */
- public override val demand: Double
- get() = ctx?.demand ?: 0.0
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public override val counters: FlowCounters
- get() = _counters
- private val _counters = FlowCountersImpl()
-
- /**
- * The [FlowConsumerContext] that is currently running.
- */
- protected var ctx: FlowConsumerContext? = null
- private set
-
- /**
- * Construct the [FlowConsumerLogic] instance for a new source.
- */
- protected abstract fun createLogic(): FlowConsumerLogic
-
- /**
- * Start the specified [FlowConsumerContext].
- */
- protected open fun start(ctx: FlowConsumerContext) {
- ctx.start()
- }
-
- /**
- * The previous demand for the consumer.
- */
- private var _previousDemand = 0.0
- private var _previousCapacity = 0.0
-
- /**
- * Update the counters of the flow consumer.
- */
- protected fun updateCounters(ctx: FlowConnection, delta: Long) {
- val demand = _previousDemand
- val capacity = _previousCapacity
-
- _previousDemand = ctx.demand
- _previousCapacity = ctx.capacity
-
- if (delta <= 0) {
- return
- }
-
- val counters = _counters
- val deltaS = delta / 1000.0
- val total = demand * deltaS
- val work = capacity * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.demand += work
- counters.actual += actualWork
- counters.remaining += (total - actualWork)
- }
-
- /**
- * Update the counters of the flow consumer.
- */
- protected fun updateCounters(demand: Double, actual: Double, remaining: Double) {
- val counters = _counters
- counters.demand += demand
- counters.actual += actual
- counters.remaining += remaining
- }
-
- final override fun startConsumer(source: FlowSource) {
- check(ctx == null) { "Consumer is in invalid state" }
- val ctx = engine.newContext(source, createLogic())
-
- ctx.capacity = capacity
- this.ctx = ctx
-
- start(ctx)
- }
-
- final override fun pull() {
- ctx?.pull()
- }
-
- final override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.close()
- }
- }
-
- override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]"
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
index c327e1e9..8ff0bc76 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -52,6 +52,13 @@ public interface FlowConnection : AutoCloseable {
public fun pull()
/**
+ * Pull the source.
+ *
+ * @param now The timestamp at which the connection is pulled.
+ */
+ public fun pull(now: Long)
+
+ /**
* Push the given flow [rate] over this connection.
*
* @param rate The rate of the flow to push.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
index d7182497..98922ab3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -55,6 +55,8 @@ public interface FlowConsumerContext : FlowConnection {
/**
* Synchronously pull the source of the connection.
+ *
+ * @param now The timestamp at which the connection is pulled.
*/
- public fun pullSync()
+ public fun pullSync(now: Long)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 7eaaf6c2..e3bdd7ba 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -23,7 +23,8 @@
package org.opendc.simulator.flow
import mu.KotlinLogging
-import org.opendc.simulator.flow.internal.FlowCountersImpl
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
/**
@@ -71,6 +72,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
_innerCtx?.pull()
}
+ override fun pull(now: Long) {
+ _innerCtx?.pull(now)
+ }
+
@JvmField var lastPull = Long.MAX_VALUE
override fun push(rate: Double) {
@@ -117,7 +122,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
override val counters: FlowCounters
get() = _counters
- private val _counters = FlowCountersImpl()
+ private val _counters = MutableFlowCounters()
override fun startConsumer(source: FlowSource) {
check(delegate == null) { "Forwarder already active" }
@@ -241,12 +246,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
}
val counters = _counters
- val deltaS = delta / 1000.0
+ val deltaS = delta * D_MS_TO_S
val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
- counters.demand += work
- counters.actual += actualWork
- counters.remaining += (total - actualWork)
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index b4eb6a38..e9094443 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -22,6 +22,9 @@
package org.opendc.simulator.flow
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
+
/**
* A [FlowSink] represents a sink with a fixed capacity.
*
@@ -33,38 +36,120 @@ public class FlowSink(
private val engine: FlowEngine,
initialCapacity: Double,
private val parent: FlowConvergenceListener? = null
-) : AbstractFlowConsumer(engine, initialCapacity) {
+) : FlowConsumer {
+ /**
+ * A flag to indicate that the flow consumer is active.
+ */
+ public override val isActive: Boolean
+ get() = _ctx != null
+
+ /**
+ * The capacity of the consumer.
+ */
+ public override var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ _ctx?.capacity = value
+ }
+
+ /**
+ * The current processing rate of the consumer.
+ */
+ public override val rate: Double
+ get() = _ctx?.rate ?: 0.0
+
+ /**
+ * The flow processing rate demand at this instant.
+ */
+ public override val demand: Double
+ get() = _ctx?.demand ?: 0.0
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
+
+ /**
+ * The current active [FlowConsumerLogic] of this sink.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ override fun startConsumer(source: FlowSource) {
+ check(_ctx == null) { "Consumer is in invalid state" }
- override fun start(ctx: FlowConsumerContext) {
+ val ctx = engine.newContext(source, Logic(parent, _counters))
+ _ctx = ctx
+
+ ctx.capacity = capacity
if (parent != null) {
ctx.shouldConsumerConverge = true
}
- super.start(ctx)
+
+ ctx.start()
}
- override fun createLogic(): FlowConsumerLogic {
- return object : FlowConsumerLogic {
- private val parent = this@FlowSink.parent
-
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- delta: Long,
- rate: Double
- ) {
- updateCounters(ctx, delta)
- }
+ override fun pull() {
+ _ctx?.pull()
+ }
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- updateCounters(ctx, delta)
- cancel()
- }
+ override fun cancel() {
+ _ctx?.close()
+ }
+
+ override fun toString(): String = "FlowSink[capacity=$capacity]"
+
+ /**
+ * [FlowConsumerLogic] of a sink.
+ */
+ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ updateCounters(ctx, delta, rate, ctx.capacity)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
+ updateCounters(ctx, delta, 0.0, 0.0)
+
+ _ctx = null
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now, delta)
+ }
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ /**
+ * The previous demand and capacity for the consumer.
+ */
+ private val _previous = DoubleArray(2)
+
+ /**
+ * Update the counters of the flow consumer.
+ */
+ private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) {
+ val counters = counters
+ val previous = _previous
+ val demand = previous[0]
+ val capacity = previous[1]
+
+ previous[0] = nextDemand
+ previous[1] = nextCapacity
+
+ if (delta <= 0) {
+ return
}
+
+ val deltaS = delta * D_MS_TO_S
+ val total = demand * deltaS
+ val work = capacity * deltaS
+ val actualWork = ctx.rate * deltaS
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
}
}
-
- override fun toString(): String = "FlowSink[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
new file mode 100644
index 00000000..450195ec
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Constant for converting milliseconds into seconds.
+ */
+internal const val D_MS_TO_S = 1 / 1000.0
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 9a568897..58ca918b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -134,8 +134,8 @@ internal class FlowConsumerContextImpl(
/**
* The timers at which the context is scheduled to be interrupted.
*/
- private var _timer: FlowEngineImpl.Timer? = null
- private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5)
+ private var _timer: Long = Long.MAX_VALUE
+ private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5)
override fun start() {
check(_flags and ConnState == ConnPending) { "Consumer is already started" }
@@ -164,17 +164,21 @@ internal class FlowConsumerContextImpl(
}
}
- override fun pull() {
+ override fun pull(now: Long) {
val flags = _flags
if (flags and ConnState != ConnActive) {
return
}
// Mark connection as pulled
- scheduleImmediate(_clock.millis(), flags or ConnPulled)
+ scheduleImmediate(now, flags or ConnPulled)
}
- override fun pullSync() {
+ override fun pull() {
+ pull(_clock.millis())
+ }
+
+ override fun pullSync(now: Long) {
val flags = _flags
// Do not attempt to flush the connection if the connection is closed or an update is already active
@@ -182,8 +186,6 @@ internal class FlowConsumerContextImpl(
return
}
- val now = _clock.millis()
-
if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
engine.scheduleSync(now, this)
}
@@ -217,8 +219,8 @@ internal class FlowConsumerContextImpl(
*/
fun doUpdate(
now: Long,
- visited: ArrayDeque<FlowConsumerContextImpl>,
- timerQueue: PriorityQueue<FlowEngineImpl.Timer>,
+ visited: FlowDeque,
+ timerQueue: FlowTimerQueue,
isImmediate: Boolean
) {
var flags = _flags
@@ -326,8 +328,7 @@ internal class FlowConsumerContextImpl(
// Prune the head timer if this is a delayed update
val timer = if (!isImmediate) {
// Invariant: Any pending timer should only point to a future timestamp
- // See also `scheduleDelayed`
- val timer = pendingTimers.poll()
+ val timer = pendingTimers.poll() ?: Long.MAX_VALUE
_timer = timer
timer
} else {
@@ -342,7 +343,7 @@ internal class FlowConsumerContextImpl(
if (newDeadline == Long.MAX_VALUE ||
flags and ConnState != ConnActive ||
flags and ConnDisableTimers != 0 ||
- (timer != null && newDeadline >= timer.target)
+ (timer != Long.MAX_VALUE && newDeadline >= timer)
) {
// Ignore any deadline scheduled at the maximum value
// This indicates that the source does not want to register a timer
@@ -350,12 +351,11 @@ internal class FlowConsumerContextImpl(
}
// Construct a timer for the new deadline and add it to the global queue of timers
- val newTimer = FlowEngineImpl.Timer(this, newDeadline)
- _timer = newTimer
- timerQueue.add(newTimer)
+ _timer = newDeadline
+ timerQueue.add(this, newDeadline)
- // A timer already exists for this connection, so add it to the queue of pending timers
- if (timer != null) {
+ // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers
+ if (timer != Long.MAX_VALUE) {
pendingTimers.addFirst(timer)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
new file mode 100644
index 00000000..c6cba4b7
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import java.util.*
+
+/**
+ * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations.
+ */
+internal class FlowDeque(initialCapacity: Int = 256) {
+ /**
+ * The array of elements in the queue.
+ */
+ private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity)
+ private var _head = 0
+ private var _tail = 0
+
+ /**
+ * Determine whether this queue is not empty.
+ */
+ fun isNotEmpty(): Boolean {
+ return _head != _tail
+ }
+
+ /**
+ * Add the specified [ctx] to the queue.
+ */
+ fun add(ctx: FlowConsumerContextImpl) {
+ val es = _elements
+ var tail = _tail
+
+ es[tail] = ctx
+
+ tail = inc(tail, es.size)
+ _tail = tail
+
+ if (_head == tail) {
+ doubleCapacity()
+ }
+ }
+
+ /**
+ * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty.
+ */
+ fun poll(): FlowConsumerContextImpl? {
+ val es = _elements
+ val head = _head
+ val ctx = es[head]
+
+ if (ctx != null) {
+ es[head] = null
+ _head = inc(head, es.size)
+ }
+
+ return ctx
+ }
+
+ /**
+ * Clear the queue.
+ */
+ fun clear() {
+ _elements.fill(null)
+ _head = 0
+ _tail = 0
+ }
+
+ private fun inc(i: Int, modulus: Int): Int {
+ var x = i
+ if (++x >= modulus) {
+ x = 0
+ }
+ return x
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private fun doubleCapacity() {
+ assert(_head == _tail)
+ val p = _head
+ val n = _elements.size
+ val r = n - p // number of elements to the right of p
+
+ val newCapacity = n shl 1
+ check(newCapacity >= 0) { "Sorry, deque too big" }
+
+ val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity)
+
+ _elements.copyInto(a, 0, p, r)
+ _elements.copyInto(a, r, 0, p)
+
+ _elements = a
+ _head = 0
+ _tail = n
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index a9234abf..3c79d54e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
-internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable {
+internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable {
/**
* The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
@@ -48,12 +48,12 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/**
* The queue of connection updates that are scheduled for immediate execution.
*/
- private val queue = ArrayDeque<FlowConsumerContextImpl>()
+ private val queue = FlowDeque()
/**
* A priority queue containing the connection updates to be scheduled in the future.
*/
- private val futureQueue = PriorityQueue<Timer>()
+ private val futureQueue = FlowTimerQueue()
/**
* The stack of engine invocations to occur in the future.
@@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/**
* The systems that have been visited during the engine cycle.
*/
- private val visited: ArrayDeque<FlowConsumerContextImpl> = ArrayDeque()
+ private val visited = FlowDeque()
/**
* The index in the batch stack.
@@ -71,6 +71,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
private var batchIndex = 0
/**
+ * The virtual [Clock] of this engine.
+ */
+ override val clock: Clock
+ get() = _clock
+ private val _clock: Clock = clock
+
+ /**
* Update the specified [ctx] synchronously.
*/
fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
@@ -113,7 +120,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
try {
// Flush the work if the engine is not already running
if (batchIndex == 1 && queue.isNotEmpty()) {
- doRunEngine(clock.millis())
+ doRunEngine(_clock.millis())
}
} finally {
batchIndex--
@@ -122,11 +129,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/* Runnable */
override fun run() {
- val now = clock.millis()
val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
- assert(now >= invocation.timestamp) { "Future invocations invariant violated" }
-
- doRunEngine(now)
+ doRunEngine(invocation.timestamp)
}
/**
@@ -144,17 +148,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
// Execute all scheduled updates at current timestamp
while (true) {
- val timer = futureQueue.peek() ?: break
- val target = timer.target
-
- if (target > now) {
- break
- }
-
- assert(target >= now) { "Internal inconsistency: found update of the past" }
-
- futureQueue.poll()
- timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
+ val ctx = futureQueue.poll(now) ?: break
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
}
// Repeat execution of all immediate updates until the system has converged to a steady-state
@@ -177,9 +172,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
}
// Schedule an engine invocation for the next update to occur.
- val headTimer = futureQueue.peek()
- if (headTimer != null) {
- trySchedule(now, futureInvocations, headTimer.target)
+ val headDeadline = futureQueue.peekDeadline()
+ if (headDeadline != Long.MAX_VALUE) {
+ trySchedule(now, futureInvocations, headDeadline)
}
}
@@ -217,17 +212,4 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
*/
fun cancel() = handle.dispose()
}
-
- /**
- * An update call for [ctx] that is scheduled for [target].
- *
- * This class represents an update in the future at [target] requested by [ctx].
- */
- class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> {
- override fun compareTo(other: Timer): Int {
- return target.compareTo(other.target)
- }
-
- override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]"
- }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
new file mode 100644
index 00000000..22a390e6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Specialized priority queue for flow timers.
+ */
+internal class FlowTimerQueue(initialCapacity: Int = 256) {
+ /**
+ * The binary heap of deadlines.
+ */
+ private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE }
+
+ /**
+ * The binary heap of [FlowConsumerContextImpl]s.
+ */
+ private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity)
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private var size = 0
+
+ /**
+ * Register a timer for [ctx] with [deadline].
+ */
+ fun add(ctx: FlowConsumerContextImpl, deadline: Long) {
+ val i = size
+ val deadlines = _deadlines
+ if (i >= deadlines.size) {
+ grow()
+ }
+
+ siftUp(deadlines, _pending, i, ctx, deadline)
+
+ size = i + 1
+ }
+
+ /**
+ * Update all pending [FlowConsumerContextImpl]s at the timestamp [now].
+ */
+ fun poll(now: Long): FlowConsumerContextImpl? {
+ if (size == 0) {
+ return null
+ }
+
+ val deadlines = _deadlines
+ val deadline = deadlines[0]
+
+ if (now < deadline) {
+ return null
+ }
+
+ val pending = _pending
+ val res = pending[0]
+ val s = --size
+
+ val nextDeadline = deadlines[s]
+ val next = pending[s]!!
+
+ // Clear the last element of the queue
+ pending[s] = null
+ deadlines[s] = Long.MIN_VALUE
+
+ if (s != 0) {
+ siftDown(deadlines, pending, next, nextDeadline)
+ }
+
+ return res
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ */
+ fun peekDeadline(): Long {
+ return if (size == 0) Long.MAX_VALUE else _deadlines[0]
+ }
+
+ /**
+ * Increases the capacity of the array.
+ */
+ private fun grow() {
+ val oldCapacity = _deadlines.size
+ // Double size if small; else grow by 50%
+ val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1
+
+ _deadlines = _deadlines.copyOf(newCapacity)
+ _pending = _pending.copyOf(newCapacity)
+ }
+
+ /**
+ * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is
+ * greater than or equal to its parent, or is the root.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param pos The position to fill.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftUp(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ pos: Int,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = pos
+
+ while (k > 0) {
+ val parent = (k - 1) ushr 1
+ val parentDeadline = deadlines[parent]
+
+ if (deadline >= parentDeadline) {
+ break
+ }
+
+ deadlines[k] = parentDeadline
+ pending[k] = pending[parent]
+
+ k = parent
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+
+ /**
+ * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it
+ * is less than or equal to its children or is a leaf.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftDown(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = 0
+ val size = size
+ val half = size ushr 1
+
+ while (k < half) {
+ var child = (k shl 1) + 1
+
+ var childDeadline = deadlines[child]
+ val right = child + 1
+
+ if (right < size) {
+ val rightDeadline = deadlines[right]
+
+ if (childDeadline > rightDeadline) {
+ child = right
+ childDeadline = rightDeadline
+ }
+ }
+
+ if (deadline <= childDeadline) {
+ break
+ }
+
+ deadlines[k] = childDeadline
+ pending[k] = pending[child]
+
+ k = child
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
index d2fa5228..d990dc61 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
@@ -27,17 +27,27 @@ import org.opendc.simulator.flow.FlowCounters
/**
* Mutable implementation of the [FlowCounters] interface.
*/
-internal class FlowCountersImpl : FlowCounters {
- override var demand: Double = 0.0
- override var actual: Double = 0.0
- override var remaining: Double = 0.0
- override var interference: Double = 0.0
+public class MutableFlowCounters : FlowCounters {
+ override val demand: Double
+ get() = _counters[0]
+ override val actual: Double
+ get() = _counters[1]
+ override val remaining: Double
+ get() = _counters[2]
+ override val interference: Double
+ get() = _counters[3]
+ private val _counters = DoubleArray(4)
override fun reset() {
- demand = 0.0
- actual = 0.0
- remaining = 0.0
- interference = 0.0
+ _counters.fill(0.0)
+ }
+
+ public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) {
+ val counters = _counters
+ counters[0] += demand
+ counters[1] += actual
+ counters[2] += remaining
+ counters[3] += interference
}
override fun toString(): String {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 97059e93..a0fb8a4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -25,7 +25,8 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceDomain
import org.opendc.simulator.flow.interference.InterferenceKey
-import org.opendc.simulator.flow.internal.FlowCountersImpl
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
import kotlin.math.min
@@ -85,7 +86,7 @@ public class MaxMinFlowMultiplexer(
private val scheduler = Scheduler(engine, parent)
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val provider = Input(engine, scheduler, interferenceDomain, key)
+ val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity)
_inputs.add(provider)
return provider
}
@@ -135,11 +136,11 @@ public class MaxMinFlowMultiplexer(
/**
* Helper class containing the scheduler state.
*/
- private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) {
+ private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) {
/**
* The flow counters of this scheduler.
*/
- @JvmField val counters = FlowCountersImpl()
+ @JvmField val counters = MutableFlowCounters()
/**
* The flow rate of the multiplexer.
@@ -167,6 +168,11 @@ public class MaxMinFlowMultiplexer(
private val _activeInputs = mutableListOf<Input>()
/**
+ * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList].
+ */
+ private var _inputArray = emptyArray<Input>()
+
+ /**
* The active outputs registered with the scheduler.
*/
private val _activeOutputs = mutableListOf<Output>()
@@ -184,10 +190,16 @@ public class MaxMinFlowMultiplexer(
private var _lastConvergeInput: Input? = null
/**
+ * The simulation clock.
+ */
+ private val _clock = engine.clock
+
+ /**
* Register the specified [input] to this scheduler.
*/
fun registerInput(input: Input) {
_activeInputs.add(input)
+ _inputArray = _activeInputs.toTypedArray()
val hasActivationOutput = activationOutput != null
@@ -195,7 +207,8 @@ public class MaxMinFlowMultiplexer(
input.shouldConsumerConverge = !hasActivationOutput
input.enableTimers = !hasActivationOutput
input.capacity = capacity
- trigger(engine.clock.millis())
+
+ trigger(_clock.millis())
}
/**
@@ -207,6 +220,8 @@ public class MaxMinFlowMultiplexer(
_lastConvergeInput = null
}
+ _activeInputs.remove(input)
+
// Re-run scheduler to distribute new load
trigger(now)
}
@@ -287,7 +302,7 @@ public class MaxMinFlowMultiplexer(
// a few inputs and little changes at the same timestamp.
// We always pick for option (1) unless there are no outputs available.
if (activationOutput != null) {
- activationOutput.pull()
+ activationOutput.pull(now)
return
} else {
runScheduler(now)
@@ -305,7 +320,7 @@ public class MaxMinFlowMultiplexer(
return try {
_schedulerActive = true
- doRunScheduler(delta)
+ doRunScheduler(now, delta)
} finally {
_schedulerActive = false
}
@@ -356,15 +371,17 @@ public class MaxMinFlowMultiplexer(
*
* @return The deadline after which a new scheduling cycle should start.
*/
- private fun doRunScheduler(delta: Long): Long {
+ private fun doRunScheduler(now: Long, delta: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
+ var inputArray = _inputArray
+ var inputSize = _inputArray.size
// Update the counters of the scheduler
updateCounters(delta)
// If there is no work yet, mark the inputs as idle.
- if (activeInputs.isEmpty()) {
+ if (inputSize == 0) {
demand = 0.0
rate = 0.0
return Long.MAX_VALUE
@@ -372,53 +389,70 @@ public class MaxMinFlowMultiplexer(
val capacity = capacity
var availableCapacity = capacity
+ var deadline = Long.MAX_VALUE
+ var demand = 0.0
+ var shouldRebuild = false
- // Pull in the work of the outputs
- val inputIterator = activeInputs.listIterator()
- for (input in inputIterator) {
- input.pullSync()
+ // Pull in the work of the inputs
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
- // Remove outputs that have finished
+ input.pullSync(now)
+
+ // Remove inputs that have finished
if (!input.isActive) {
input.actualRate = 0.0
- inputIterator.remove()
+ shouldRebuild = true
+ } else {
+ demand += input.limit
+ deadline = min(deadline, input.deadline)
}
}
- var demand = 0.0
- var deadline = Long.MAX_VALUE
+ // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs`
+ if (shouldRebuild) {
+ inputArray = activeInputs.toTypedArray()
+ inputSize = inputArray.size
+ _inputArray = inputArray
+ }
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- activeInputs.sort()
+ val rate = if (demand > capacity) {
+ // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
+ // constrained capacity across the inputs.
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- val size = activeInputs.size
- for (i in activeInputs.indices) {
- val input = activeInputs[i]
- val availableShare = availableCapacity / (size - i)
- val grantedRate = min(input.allowedRate, availableShare)
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ inputArray.sort()
- demand += input.limit
- deadline = min(deadline, input.deadline)
- availableCapacity -= grantedRate
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
+ val availableShare = availableCapacity / (inputSize - i)
+ val grantedRate = min(input.allowedRate, availableShare)
- input.actualRate = grantedRate
- }
+ availableCapacity -= grantedRate
+ input.actualRate = grantedRate
+ }
- val rate = capacity - availableCapacity
+ capacity - availableCapacity
+ } else {
+ demand
+ }
this.demand = demand
- this.rate = rate
-
- // Divide the requests over the available capacity of the input resources fairly
- for (i in activeOutputs.indices) {
- val output = activeOutputs[i]
- val inputCapacity = output.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
-
- output.push(grantedSpeed)
+ if (this.rate != rate) {
+ // Only update the outputs if the output rate has changed
+ this.rate = rate
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
}
return deadline
@@ -440,11 +474,16 @@ public class MaxMinFlowMultiplexer(
return
}
- val deltaS = delta / 1000.0
+ val deltaS = delta * D_MS_TO_S
+ val demand = demand
+ val rate = rate
- counters.demand += demand * deltaS
- counters.actual += rate * deltaS
- counters.remaining += (previousCapacity - rate) * deltaS
+ counters.increment(
+ demand = demand * deltaS,
+ actual = rate * deltaS,
+ remaining = (previousCapacity - rate) * deltaS,
+ interference = 0.0
+ )
}
}
@@ -452,32 +491,48 @@ public class MaxMinFlowMultiplexer(
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private class Input(
- engine: FlowEngine,
+ private val engine: FlowEngine,
private val scheduler: Scheduler,
private val interferenceDomain: InterferenceDomain?,
- @JvmField val key: InterferenceKey?
- ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> {
+ @JvmField val key: InterferenceKey?,
+ initialCapacity: Double,
+ ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
/**
- * The requested limit.
+ * A flag to indicate that the consumer is active.
*/
- @JvmField var limit: Double = 0.0
+ override val isActive: Boolean
+ get() = _ctx != null
/**
- * The actual processing speed.
+ * The demand of the consumer.
*/
- @JvmField var actualRate: Double = 0.0
+ override val demand: Double
+ get() = limit
/**
- * The deadline of the input.
+ * The processing rate of the consumer.
*/
- val deadline: Long
- get() = ctx?.deadline ?: Long.MAX_VALUE
+ override val rate: Double
+ get() = actualRate
/**
- * The processing rate that is allowed by the model constraints.
+ * The capacity of the input.
*/
- val allowedRate: Double
- get() = min(capacity, limit)
+ override var capacity: Double
+ get() = _capacity
+ set(value) {
+ allowedRate = min(limit, value)
+ _capacity = value
+ _ctx?.capacity = value
+ }
+ private var _capacity = initialCapacity
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
/**
* A flag to enable timers for the input.
@@ -485,7 +540,7 @@ public class MaxMinFlowMultiplexer(
var enableTimers: Boolean = true
set(value) {
field = value
- ctx?.enableTimers = value
+ _ctx?.enableTimers = value
}
/**
@@ -494,10 +549,36 @@ public class MaxMinFlowMultiplexer(
var shouldConsumerConverge: Boolean = true
set(value) {
field = value
- ctx?.shouldConsumerConverge = value
+ _ctx?.shouldConsumerConverge = value
}
/**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing rate that is allowed by the model constraints.
+ */
+ @JvmField var allowedRate: Double = 0.0
+
+ /**
+ * The deadline of the input.
+ */
+ val deadline: Long
+ get() = _ctx?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * The [FlowConsumerContext] that is currently running.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ /**
* A flag to indicate that the input is closed.
*/
private var _isClosed: Boolean = false
@@ -512,13 +593,33 @@ public class MaxMinFlowMultiplexer(
cancel()
}
- /* AbstractFlowConsumer */
- override fun createLogic(): FlowConsumerLogic = this
+ /**
+ * Pull the source if necessary.
+ */
+ fun pullSync(now: Long) {
+ _ctx?.pullSync(now)
+ }
- override fun start(ctx: FlowConsumerContext) {
+ /* FlowConsumer */
+ override fun startConsumer(source: FlowSource) {
check(!_isClosed) { "Cannot re-use closed input" }
+ check(_ctx == null) { "Consumer is in invalid state" }
+
+ val ctx = engine.newContext(source, this)
+ _ctx = ctx
+
+ ctx.capacity = capacity
scheduler.registerInput(this)
- super.start(ctx)
+
+ ctx.start()
+ }
+
+ override fun pull() {
+ _ctx?.pull()
+ }
+
+ override fun cancel() {
+ _ctx?.close()
}
/* FlowConsumerLogic */
@@ -530,8 +631,10 @@ public class MaxMinFlowMultiplexer(
) {
doUpdateCounters(delta)
- actualRate = 0.0
+ val allowed = min(rate, capacity)
limit = rate
+ actualRate = allowed
+ allowedRate = allowed
scheduler.trigger(now)
}
@@ -541,11 +644,11 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
+ allowedRate = 0.0
scheduler.deregisterInput(this, now)
- // BUG: Cancel the connection so that `ctx` is set to `null`
- cancel()
+ _ctx = null
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
@@ -556,13 +659,6 @@ public class MaxMinFlowMultiplexer(
override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
/**
- * Pull the source if necessary.
- */
- fun pullSync() {
- ctx?.pullSync()
- }
-
- /**
* Helper method to update the flow counters of the multiplexer.
*/
private fun doUpdateCounters(delta: Long) {
@@ -578,14 +674,16 @@ public class MaxMinFlowMultiplexer(
1.0
}
- val deltaS = delta / 1000.0
+ val actualRate = actualRate
+
+ val deltaS = delta * D_MS_TO_S
val demand = limit * deltaS
val actual = actualRate * deltaS
- val remaining = (capacity - actualRate) * deltaS
+ val remaining = (_capacity - actualRate) * deltaS
+ val interference = actual * max(0.0, 1 - perfScore)
- updateCounters(demand, actual, remaining)
-
- scheduler.counters.interference += actual * max(0.0, 1 - perfScore)
+ _counters.increment(demand, actual, remaining, interference)
+ scheduler.counters.increment(0.0, 0.0, 0.0, interference)
}
}
@@ -636,8 +734,8 @@ public class MaxMinFlowMultiplexer(
/**
* Pull this output.
*/
- fun pull() {
- _conn?.pull()
+ fun pull(now: Long) {
+ _conn?.pull(now)
}
override fun onStart(conn: FlowConnection, now: Long) {
@@ -675,6 +773,7 @@ public class MaxMinFlowMultiplexer(
// Output is not the activation output, so trigger activation output and do not install timer for this
// output (by returning `Long.MAX_VALUE`)
scheduler.trigger(now)
+
Long.MAX_VALUE
}
}