summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-07 14:39:03 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-08 17:11:52 +0200
commit774ed886ac8f84ae2974c1204534ee332d920864 (patch)
treee390f4393d194d435c75a64fcb45b9d52d4123d1 /opendc-simulator/opendc-simulator-flow/src
parenta0340a8752c4c4ed8413944b1dfb81b9481b6556 (diff)
fix(simulator): Count interference for multiplexer inputs
This change updates the SimAbstractHypervisor and MaxMinFlowMultiplexer to count interference of multiplexer inputs, instead of only counting them for the scheduler as a whole.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt9
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt12
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt)28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt147
5 files changed, 138 insertions, 84 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
index b02426e3..5f1057e8 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow
-import org.opendc.simulator.flow.internal.FlowCountersImpl
+import org.opendc.simulator.flow.internal.MutableFlowCounters
/**
* Abstract implementation of the [FlowConsumer] which can be re-used by other implementations.
@@ -56,13 +56,6 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
get() = ctx?.demand ?: 0.0
/**
- * The flow counters to track the flow metrics of the consumer.
- */
- public override val counters: FlowCounters
- get() = _counters
- private val _counters = FlowCountersImpl()
-
- /**
* The [FlowConsumerContext] that is currently running.
*/
protected var ctx: FlowConsumerContext? = null
@@ -89,7 +82,7 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
/**
* Update the counters of the flow consumer.
*/
- protected fun updateCounters(ctx: FlowConnection, delta: Long) {
+ protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) {
val demand = _previousDemand
val capacity = _previousCapacity
@@ -100,25 +93,12 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi
return
}
- val counters = _counters
val deltaS = delta / 1000.0
val total = demand * deltaS
val work = capacity * deltaS
val actualWork = ctx.rate * deltaS
- counters.demand += work
- counters.actual += actualWork
- counters.remaining += (total - actualWork)
- }
-
- /**
- * Update the counters of the flow consumer.
- */
- protected fun updateCounters(demand: Double, actual: Double, remaining: Double) {
- val counters = _counters
- counters.demand += demand
- counters.actual += actual
- counters.remaining += remaining
+ increment(work, actualWork, (total - actualWork), 0.0)
}
final override fun startConsumer(source: FlowSource) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 7eaaf6c2..229fd96a 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -23,7 +23,7 @@
package org.opendc.simulator.flow
import mu.KotlinLogging
-import org.opendc.simulator.flow.internal.FlowCountersImpl
+import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
/**
@@ -117,7 +117,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
override val counters: FlowCounters
get() = _counters
- private val _counters = FlowCountersImpl()
+ private val _counters = MutableFlowCounters()
override fun startConsumer(source: FlowSource) {
check(delegate == null) { "Forwarder already active" }
@@ -245,8 +245,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
- counters.demand += work
- counters.actual += actualWork
- counters.remaining += (total - actualWork)
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index b4eb6a38..170ab1c0 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.flow
+import org.opendc.simulator.flow.internal.MutableFlowCounters
+
/**
* A [FlowSink] represents a sink with a fixed capacity.
*
@@ -34,6 +36,12 @@ public class FlowSink(
initialCapacity: Double,
private val parent: FlowConvergenceListener? = null
) : AbstractFlowConsumer(engine, initialCapacity) {
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
override fun start(ctx: FlowConsumerContext) {
if (parent != null) {
@@ -52,11 +60,11 @@ public class FlowSink(
delta: Long,
rate: Double
) {
- updateCounters(ctx, delta)
+ _counters.update(ctx, delta)
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- updateCounters(ctx, delta)
+ _counters.update(ctx, delta)
cancel()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
index d2fa5228..d990dc61 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
@@ -27,17 +27,27 @@ import org.opendc.simulator.flow.FlowCounters
/**
* Mutable implementation of the [FlowCounters] interface.
*/
-internal class FlowCountersImpl : FlowCounters {
- override var demand: Double = 0.0
- override var actual: Double = 0.0
- override var remaining: Double = 0.0
- override var interference: Double = 0.0
+public class MutableFlowCounters : FlowCounters {
+ override val demand: Double
+ get() = _counters[0]
+ override val actual: Double
+ get() = _counters[1]
+ override val remaining: Double
+ get() = _counters[2]
+ override val interference: Double
+ get() = _counters[3]
+ private val _counters = DoubleArray(4)
override fun reset() {
- demand = 0.0
- actual = 0.0
- remaining = 0.0
- interference = 0.0
+ _counters.fill(0.0)
+ }
+
+ public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) {
+ val counters = _counters
+ counters[0] += demand
+ counters[1] += actual
+ counters[2] += remaining
+ counters[3] += interference
}
override fun toString(): String {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 9131eb54..eaa3f7c5 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -25,7 +25,7 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceDomain
import org.opendc.simulator.flow.interference.InterferenceKey
-import org.opendc.simulator.flow.internal.FlowCountersImpl
+import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
import kotlin.math.min
@@ -85,7 +85,7 @@ public class MaxMinFlowMultiplexer(
private val scheduler = Scheduler(engine, parent)
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val provider = Input(engine, scheduler, interferenceDomain, key)
+ val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity)
_inputs.add(provider)
return provider
}
@@ -135,11 +135,11 @@ public class MaxMinFlowMultiplexer(
/**
* Helper class containing the scheduler state.
*/
- private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) {
+ private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) {
/**
* The flow counters of this scheduler.
*/
- @JvmField val counters = FlowCountersImpl()
+ @JvmField val counters = MutableFlowCounters()
/**
* The flow rate of the multiplexer.
@@ -184,6 +184,11 @@ public class MaxMinFlowMultiplexer(
private var _lastConvergeInput: Input? = null
/**
+ * The simulation clock.
+ */
+ private val _clock = engine.clock
+
+ /**
* Register the specified [input] to this scheduler.
*/
fun registerInput(input: Input) {
@@ -195,7 +200,7 @@ public class MaxMinFlowMultiplexer(
input.shouldConsumerConverge = !hasActivationOutput
input.enableTimers = !hasActivationOutput
input.capacity = capacity
- trigger(engine.clock.millis())
+ trigger(_clock.millis())
}
/**
@@ -447,10 +452,15 @@ public class MaxMinFlowMultiplexer(
}
val deltaS = delta / 1000.0
-
- counters.demand += demand * deltaS
- counters.actual += rate * deltaS
- counters.remaining += (previousCapacity - rate) * deltaS
+ val demand = demand
+ val rate = rate
+
+ counters.increment(
+ demand = demand * deltaS,
+ actual = rate * deltaS,
+ remaining = (previousCapacity - rate) * deltaS,
+ interference = 0.0
+ )
}
}
@@ -458,41 +468,48 @@ public class MaxMinFlowMultiplexer(
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private class Input(
- engine: FlowEngine,
+ private val engine: FlowEngine,
private val scheduler: Scheduler,
private val interferenceDomain: InterferenceDomain?,
- @JvmField val key: InterferenceKey?
- ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> {
- /**
- * The requested limit.
- */
- @JvmField var limit: Double = 0.0
-
+ @JvmField val key: InterferenceKey?,
+ initialCapacity: Double,
+ ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
/**
- * The actual processing speed.
+ * A flag to indicate that the consumer is active.
*/
- @JvmField var actualRate: Double = 0.0
+ override val isActive: Boolean
+ get() = _ctx != null
/**
- * The processing rate that is allowed by the model constraints.
+ * The demand of the consumer.
*/
- @JvmField var allowedRate: Double = 0.0
+ override val demand: Double
+ get() = limit
/**
- * The deadline of the input.
+ * The processing rate of the consumer.
*/
- val deadline: Long
- get() = ctx?.deadline ?: Long.MAX_VALUE
+ override val rate: Double
+ get() = actualRate
/**
* The capacity of the input.
*/
override var capacity: Double
- get() = super.capacity
+ get() = _capacity
set(value) {
allowedRate = min(limit, value)
- super.capacity = value
+ _capacity = value
+ _ctx?.capacity = value
}
+ private var _capacity = initialCapacity
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
/**
* A flag to enable timers for the input.
@@ -500,7 +517,7 @@ public class MaxMinFlowMultiplexer(
var enableTimers: Boolean = true
set(value) {
field = value
- ctx?.enableTimers = value
+ _ctx?.enableTimers = value
}
/**
@@ -509,10 +526,36 @@ public class MaxMinFlowMultiplexer(
var shouldConsumerConverge: Boolean = true
set(value) {
field = value
- ctx?.shouldConsumerConverge = value
+ _ctx?.shouldConsumerConverge = value
}
/**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing rate that is allowed by the model constraints.
+ */
+ @JvmField var allowedRate: Double = 0.0
+
+ /**
+ * The deadline of the input.
+ */
+ val deadline: Long
+ get() = _ctx?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * The [FlowConsumerContext] that is currently running.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ /**
* A flag to indicate that the input is closed.
*/
private var _isClosed: Boolean = false
@@ -527,13 +570,33 @@ public class MaxMinFlowMultiplexer(
cancel()
}
- /* AbstractFlowConsumer */
- override fun createLogic(): FlowConsumerLogic = this
+ /**
+ * Pull the source if necessary.
+ */
+ fun pullSync() {
+ _ctx?.pullSync()
+ }
- override fun start(ctx: FlowConsumerContext) {
+ /* FlowConsumer */
+ override fun startConsumer(source: FlowSource) {
check(!_isClosed) { "Cannot re-use closed input" }
+ check(_ctx == null) { "Consumer is in invalid state" }
+
+ val ctx = engine.newContext(source, this)
+ _ctx = ctx
+
+ ctx.capacity = capacity
scheduler.registerInput(this)
- super.start(ctx)
+
+ ctx.start()
+ }
+
+ override fun pull() {
+ _ctx?.pull()
+ }
+
+ override fun cancel() {
+ _ctx?.close()
}
/* FlowConsumerLogic */
@@ -562,8 +625,7 @@ public class MaxMinFlowMultiplexer(
scheduler.deregisterInput(this, now)
- // BUG: Cancel the connection so that `ctx` is set to `null`
- cancel()
+ _ctx = null
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
@@ -574,13 +636,6 @@ public class MaxMinFlowMultiplexer(
override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
/**
- * Pull the source if necessary.
- */
- fun pullSync() {
- ctx?.pullSync()
- }
-
- /**
* Helper method to update the flow counters of the multiplexer.
*/
private fun doUpdateCounters(delta: Long) {
@@ -596,14 +651,16 @@ public class MaxMinFlowMultiplexer(
1.0
}
+ val actualRate = actualRate
+
val deltaS = delta / 1000.0
val demand = limit * deltaS
val actual = actualRate * deltaS
- val remaining = (capacity - actualRate) * deltaS
-
- updateCounters(demand, actual, remaining)
+ val remaining = (_capacity - actualRate) * deltaS
+ val interference = actual * max(0.0, 1 - perfScore)
- scheduler.counters.interference += actual * max(0.0, 1 - perfScore)
+ _counters.increment(demand, actual, remaining, interference)
+ scheduler.counters.increment(0.0, 0.0, 0.0, interference)
}
}