summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt127
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt123
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt5
5 files changed, 133 insertions, 153 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
deleted file mode 100644
index 5f1057e8..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-
-/**
- * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations.
- */
-public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer {
- /**
- * A flag to indicate that the flow consumer is active.
- */
- public override val isActive: Boolean
- get() = ctx != null
-
- /**
- * The capacity of the consumer.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- ctx?.capacity = value
- }
-
- /**
- * The current processing rate of the consumer.
- */
- public override val rate: Double
- get() = ctx?.rate ?: 0.0
-
- /**
- * The flow processing rate demand at this instant.
- */
- public override val demand: Double
- get() = ctx?.demand ?: 0.0
-
- /**
- * The [FlowConsumerContext] that is currently running.
- */
- protected var ctx: FlowConsumerContext? = null
- private set
-
- /**
- * Construct the [FlowConsumerLogic] instance for a new source.
- */
- protected abstract fun createLogic(): FlowConsumerLogic
-
- /**
- * Start the specified [FlowConsumerContext].
- */
- protected open fun start(ctx: FlowConsumerContext) {
- ctx.start()
- }
-
- /**
- * The previous demand for the consumer.
- */
- private var _previousDemand = 0.0
- private var _previousCapacity = 0.0
-
- /**
- * Update the counters of the flow consumer.
- */
- protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) {
- val demand = _previousDemand
- val capacity = _previousCapacity
-
- _previousDemand = ctx.demand
- _previousCapacity = ctx.capacity
-
- if (delta <= 0) {
- return
- }
-
- val deltaS = delta / 1000.0
- val total = demand * deltaS
- val work = capacity * deltaS
- val actualWork = ctx.rate * deltaS
-
- increment(work, actualWork, (total - actualWork), 0.0)
- }
-
- final override fun startConsumer(source: FlowSource) {
- check(ctx == null) { "Consumer is in invalid state" }
- val ctx = engine.newContext(source, createLogic())
-
- ctx.capacity = capacity
- this.ctx = ctx
-
- start(ctx)
- }
-
- final override fun pull() {
- ctx?.pull()
- }
-
- final override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.close()
- }
- }
-
- override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]"
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 229fd96a..7230a966 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -23,6 +23,7 @@
package org.opendc.simulator.flow
import mu.KotlinLogging
+import org.opendc.simulator.flow.internal.D_MS_TO_S
import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
@@ -241,7 +242,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
}
val counters = _counters
- val deltaS = delta / 1000.0
+ val deltaS = delta * D_MS_TO_S
val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index 170ab1c0..e9094443 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -22,6 +22,7 @@
package org.opendc.simulator.flow
+import org.opendc.simulator.flow.internal.D_MS_TO_S
import org.opendc.simulator.flow.internal.MutableFlowCounters
/**
@@ -35,7 +36,34 @@ public class FlowSink(
private val engine: FlowEngine,
initialCapacity: Double,
private val parent: FlowConvergenceListener? = null
-) : AbstractFlowConsumer(engine, initialCapacity) {
+) : FlowConsumer {
+ /**
+ * A flag to indicate that the flow consumer is active.
+ */
+ public override val isActive: Boolean
+ get() = _ctx != null
+
+ /**
+ * The capacity of the consumer.
+ */
+ public override var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ _ctx?.capacity = value
+ }
+
+ /**
+ * The current processing rate of the consumer.
+ */
+ public override val rate: Double
+ get() = _ctx?.rate ?: 0.0
+
+ /**
+ * The flow processing rate demand at this instant.
+ */
+ public override val demand: Double
+ get() = _ctx?.demand ?: 0.0
+
/**
* The flow counters to track the flow metrics of the consumer.
*/
@@ -43,36 +71,85 @@ public class FlowSink(
get() = _counters
private val _counters = MutableFlowCounters()
- override fun start(ctx: FlowConsumerContext) {
+ /**
+ * The current active [FlowConsumerLogic] of this sink.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ override fun startConsumer(source: FlowSource) {
+ check(_ctx == null) { "Consumer is in invalid state" }
+
+ val ctx = engine.newContext(source, Logic(parent, _counters))
+ _ctx = ctx
+
+ ctx.capacity = capacity
if (parent != null) {
ctx.shouldConsumerConverge = true
}
- super.start(ctx)
+
+ ctx.start()
}
- override fun createLogic(): FlowConsumerLogic {
- return object : FlowConsumerLogic {
- private val parent = this@FlowSink.parent
-
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- delta: Long,
- rate: Double
- ) {
- _counters.update(ctx, delta)
- }
+ override fun pull() {
+ _ctx?.pull()
+ }
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- _counters.update(ctx, delta)
- cancel()
- }
+ override fun cancel() {
+ _ctx?.close()
+ }
+
+ override fun toString(): String = "FlowSink[capacity=$capacity]"
+
+ /**
+ * [FlowConsumerLogic] of a sink.
+ */
+ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ updateCounters(ctx, delta, rate, ctx.capacity)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
+ updateCounters(ctx, delta, 0.0, 0.0)
+
+ _ctx = null
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now, delta)
+ }
+
+ /**
+ * The previous demand and capacity for the consumer.
+ */
+ private val _previous = DoubleArray(2)
+
+ /**
+ * Update the counters of the flow consumer.
+ */
+ private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) {
+ val counters = counters
+ val previous = _previous
+ val demand = previous[0]
+ val capacity = previous[1]
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ previous[0] = nextDemand
+ previous[1] = nextCapacity
+
+ if (delta <= 0) {
+ return
}
+
+ val deltaS = delta * D_MS_TO_S
+ val total = demand * deltaS
+ val work = capacity * deltaS
+ val actualWork = ctx.rate * deltaS
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
}
}
-
- override fun toString(): String = "FlowSink[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
new file mode 100644
index 00000000..450195ec
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Constant for converting milliseconds into seconds.
+ */
+internal const val D_MS_TO_S = 1 / 1000.0
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index eaa3f7c5..31fb5b73 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -25,6 +25,7 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceDomain
import org.opendc.simulator.flow.interference.InterferenceKey
+import org.opendc.simulator.flow.internal.D_MS_TO_S
import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
import kotlin.math.min
@@ -451,7 +452,7 @@ public class MaxMinFlowMultiplexer(
return
}
- val deltaS = delta / 1000.0
+ val deltaS = delta * D_MS_TO_S
val demand = demand
val rate = rate
@@ -653,7 +654,7 @@ public class MaxMinFlowMultiplexer(
val actualRate = actualRate
- val deltaS = delta / 1000.0
+ val deltaS = delta * D_MS_TO_S
val demand = limit * deltaS
val actual = actualRate * deltaS
val remaining = (_capacity - actualRate) * deltaS