summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt13
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt90
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt233
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt38
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt83
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt52
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt14
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt40
-rw-r--r--opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt44
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt147
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt7
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt34
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt116
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt54
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt195
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt)28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt263
21 files changed, 1114 insertions, 516 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
index b8e0227a..cb52d24f 100644
--- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt
@@ -22,7 +22,6 @@
package org.opendc.simulator.compute
-import javafx.application.Application.launch
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
@@ -34,6 +33,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTrace
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit
@OptIn(ExperimentalCoroutinesApi::class)
class SimMachineBenchmarks {
private lateinit var machineModel: MachineModel
- private lateinit var trace: Sequence<SimTraceWorkload.Fragment>
+ private lateinit var trace: SimTrace
@Setup
fun setUp() {
@@ -60,8 +60,13 @@ class SimMachineBenchmarks {
)
val random = ThreadLocalRandom.current()
- val entries = List(10000) { SimTraceWorkload.Fragment(it * 1000L, 1000, random.nextDouble(0.0, 4500.0), 1) }
- trace = entries.asSequence()
+ val builder = SimTrace.builder()
+ repeat(10000) {
+ val timestamp = it.toLong()
+ val deadline = timestamp + 1000
+ builder.add(timestamp, deadline, random.nextDouble(0.0, 4500.0), 1)
+ }
+ trace = builder.build()
}
@Benchmark
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
index aac8b959..f6d8f628 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt
@@ -66,45 +66,7 @@ public abstract class SimAbstractHypervisor(
*/
public override val counters: SimHypervisorCounters
get() = _counters
- private val _counters = object : SimHypervisorCounters {
- @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity
-
- override var cpuActiveTime: Long = 0L
- override var cpuIdleTime: Long = 0L
- override var cpuStealTime: Long = 0L
- override var cpuLostTime: Long = 0L
-
- private var _previousDemand = 0.0
- private var _previousActual = 0.0
- private var _previousRemaining = 0.0
- private var _previousInterference = 0.0
-
- /**
- * Record the CPU time of the hypervisor.
- */
- fun record() {
- val counters = mux.counters
- val demand = counters.demand
- val actual = counters.actual
- val remaining = counters.remaining
- val interference = counters.interference
-
- val demandDelta = demand - _previousDemand
- val actualDelta = actual - _previousActual
- val remainingDelta = remaining - _previousRemaining
- val interferenceDelta = interference - _previousInterference
-
- _previousDemand = demand
- _previousActual = actual
- _previousRemaining = remaining
- _previousInterference = interference
-
- cpuActiveTime += (actualDelta * d).roundToLong()
- cpuIdleTime += (remainingDelta * d).roundToLong()
- cpuStealTime += ((demandDelta - actualDelta) * d).roundToLong()
- cpuLostTime += (interferenceDelta * d).roundToLong()
- }
- }
+ private val _counters = CountersImpl(this)
/**
* The CPU capacity of the hypervisor in MHz.
@@ -204,7 +166,7 @@ public abstract class SimAbstractHypervisor(
get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong()
override val cpuStealTime: Long
get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong()
- override val cpuLostTime: Long = 0L
+ override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong()
}
/**
@@ -277,4 +239,52 @@ public abstract class SimAbstractHypervisor(
override val min: Double = 0.0
}
+
+ /**
+ * Implementation of [SimHypervisorCounters].
+ */
+ private class CountersImpl(private val hv: SimAbstractHypervisor) : SimHypervisorCounters {
+ @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity
+
+ override val cpuActiveTime: Long
+ get() = _cpuTime[0]
+ override val cpuIdleTime: Long
+ get() = _cpuTime[1]
+ override val cpuStealTime: Long
+ get() = _cpuTime[2]
+ override val cpuLostTime: Long
+ get() = _cpuTime[3]
+
+ private val _cpuTime = LongArray(4)
+ private val _previous = DoubleArray(4)
+
+ /**
+ * Record the CPU time of the hypervisor.
+ */
+ fun record() {
+ val cpuTime = _cpuTime
+ val previous = _previous
+ val counters = hv.mux.counters
+
+ val demand = counters.demand
+ val actual = counters.actual
+ val remaining = counters.remaining
+ val interference = counters.interference
+
+ val demandDelta = demand - previous[0]
+ val actualDelta = actual - previous[1]
+ val remainingDelta = remaining - previous[2]
+ val interferenceDelta = interference - previous[3]
+
+ previous[0] = demand
+ previous[1] = actual
+ previous[2] = remaining
+ previous[3] = interference
+
+ cpuTime[0] += (actualDelta * d).roundToLong()
+ cpuTime[1] += (remainingDelta * d).roundToLong()
+ cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong()
+ cpuTime[3] += (interferenceDelta * d).roundToLong()
+ }
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
new file mode 100644
index 00000000..4f567b55
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt
@@ -0,0 +1,233 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
+import kotlin.math.min
+
+/**
+ * A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s.
+ *
+ * @param usageCol The column containing the CPU usage of each fragment (in MHz).
+ * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis).
+ * @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis).
+ * @param coresCol The column containing the utilized cores.
+ * @param size The number of fragments in the trace.
+ */
+public class SimTrace(
+ private val usageCol: DoubleArray,
+ private val timestampCol: LongArray,
+ private val deadlineCol: LongArray,
+ private val coresCol: IntArray,
+ private val size: Int,
+) {
+ init {
+ require(size >= 0) { "Invalid trace size" }
+ require(usageCol.size >= size) { "Invalid number of usage entries" }
+ require(timestampCol.size >= size) { "Invalid number of timestamp entries" }
+ require(deadlineCol.size >= size) { "Invalid number of deadline entries" }
+ require(coresCol.size >= size) { "Invalid number of core entries" }
+ }
+
+ public companion object {
+ /**
+ * Construct a [SimTrace] with the specified fragments.
+ */
+ public fun ofFragments(fragments: List<SimTraceFragment>): SimTrace {
+ val size = fragments.size
+ val usageCol = DoubleArray(size)
+ val timestampCol = LongArray(size)
+ val deadlineCol = LongArray(size)
+ val coresCol = IntArray(size)
+
+ for (i in fragments.indices) {
+ val fragment = fragments[i]
+ usageCol[i] = fragment.usage
+ timestampCol[i] = fragment.timestamp
+ deadlineCol[i] = fragment.timestamp + fragment.duration
+ coresCol[i] = fragment.cores
+ }
+
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * Construct a [SimTrace] with the specified fragments.
+ */
+ @JvmStatic
+ public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace {
+ val size = fragments.size
+ val usageCol = DoubleArray(size)
+ val timestampCol = LongArray(size)
+ val deadlineCol = LongArray(size)
+ val coresCol = IntArray(size)
+
+ for (i in fragments.indices) {
+ val fragment = fragments[i]
+ usageCol[i] = fragment.usage
+ timestampCol[i] = fragment.timestamp
+ deadlineCol[i] = fragment.timestamp + fragment.duration
+ coresCol[i] = fragment.cores
+ }
+
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * Create a [SimTrace.Builder] instance.
+ */
+ @JvmStatic
+ public fun builder(): Builder = Builder()
+ }
+
+ /**
+ * Construct a new [FlowSource] for the specified [cpu].
+ */
+ public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource {
+ return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+
+ /**
+ * A builder class for a [SimTrace].
+ */
+ public class Builder internal constructor() {
+ /**
+ * The columns of the trace.
+ */
+ private var usageCol: DoubleArray = DoubleArray(16)
+ private var timestampCol: LongArray = LongArray(16)
+ private var deadlineCol: LongArray = LongArray(16)
+ private var coresCol: IntArray = IntArray(16)
+
+ /**
+ * The number of entries in the trace.
+ */
+ private var size = 0
+
+ /**
+ * Add the specified [SimTraceFragment] to the trace.
+ */
+ public fun add(fragment: SimTraceFragment) {
+ add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores)
+ }
+
+ /**
+ * Add a fragment to the trace.
+ *
+ * @param timestamp Timestamp at which the fragment starts (in epoch millis).
+ * @param deadline Timestamp at which the fragment ends (in epoch millis).
+ * @param usage CPU usage of this fragment.
+ * @param cores Number of cores used.
+ */
+ public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) {
+ val size = size
+
+ if (size == usageCol.size) {
+ grow()
+ }
+
+ timestampCol[size] = timestamp
+ deadlineCol[size] = deadline
+ usageCol[size] = usage
+ coresCol[size] = cores
+
+ this.size++
+ }
+
+ /**
+ * Helper function to grow the capacity of the column arrays.
+ */
+ private fun grow() {
+ val arraySize = usageCol.size
+ val newSize = arraySize * 2
+
+ usageCol = usageCol.copyOf(newSize)
+ timestampCol = timestampCol.copyOf(newSize)
+ deadlineCol = deadlineCol.copyOf(newSize)
+ coresCol = coresCol.copyOf(newSize)
+ }
+
+ /**
+ * Construct the immutable [SimTrace].
+ */
+ public fun build(): SimTrace {
+ return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size)
+ }
+ }
+
+ /**
+ * A CPU consumer for the trace workload.
+ */
+ private class CpuConsumer(
+ cpu: ProcessingUnit,
+ private val offset: Long,
+ private val usageCol: DoubleArray,
+ private val timestampCol: LongArray,
+ private val deadlineCol: LongArray,
+ private val coresCol: IntArray,
+ private val size: Int
+ ) : FlowSource {
+ private val id = cpu.id
+ private val coreCount = cpu.node.coreCount
+
+ /**
+ * The index in the trace.
+ */
+ private var _idx = 0
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val size = size
+ val nowOffset = now - offset
+
+ var idx = _idx
+ val deadlines = deadlineCol
+ var deadline = deadlines[idx]
+
+ while (deadline <= nowOffset && ++idx < size) {
+ deadline = deadlines[idx]
+ }
+
+ if (idx >= size) {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ _idx = idx
+ val timestamp = timestampCol[idx]
+
+ // Fragment is in the future
+ if (timestamp > nowOffset) {
+ conn.push(0.0)
+ return timestamp - nowOffset
+ }
+
+ val cores = min(coreCount, coresCol[idx])
+ val usage = usageCol[idx]
+
+ conn.push(if (id < cores) usage / cores else 0.0)
+ return deadline - nowOffset
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt
new file mode 100644
index 00000000..5285847f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload
+
+/**
+ * A fragment of the workload trace.
+ *
+ * @param timestamp The timestamp at which the fragment starts (in epoch millis).
+ * @param duration The duration of the fragment (in milliseconds).
+ * @param usage The CPU usage during the fragment (in MHz).
+ * @param cores The amount of cores utilized during the fragment.
+ */
+public data class SimTraceFragment(
+ @JvmField val timestamp: Long,
+ @JvmField val duration: Long,
+ @JvmField val usage: Double,
+ @JvmField val cores: Int
+)
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
index 49ae5933..53c98409 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt
@@ -23,10 +23,6 @@
package org.opendc.simulator.compute.workload
import org.opendc.simulator.compute.SimMachineContext
-import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-import kotlin.math.min
/**
* A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource
@@ -35,89 +31,14 @@ import kotlin.math.min
* @param trace The trace of fragments to use.
* @param offset The offset for the timestamps.
*/
-public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload {
- private val iterator = trace.iterator()
- private var fragment: Fragment? = null
-
+public class SimTraceWorkload(private val trace: SimTrace, private val offset: Long = 0L) : SimWorkload {
override fun onStart(ctx: SimMachineContext) {
val lifecycle = SimWorkloadLifecycle(ctx)
for (cpu in ctx.cpus) {
- cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model)))
+ cpu.startConsumer(lifecycle.waitFor(trace.newSource(cpu.model, offset)))
}
}
override fun toString(): String = "SimTraceWorkload"
-
- /**
- * Obtain the fragment with a timestamp equal or greater than [now].
- */
- private fun pullFragment(now: Long): Fragment? {
- // Return the most recent fragment if its starting time + duration is later than `now`
- var fragment = fragment
- if (fragment != null && fragment.timestamp + offset + fragment.duration > now) {
- return fragment
- }
-
- while (iterator.hasNext()) {
- fragment = iterator.next()
- if (fragment.timestamp + offset + fragment.duration > now) {
- this.fragment = fragment
- return fragment
- }
- }
-
- this.fragment = null
- return null
- }
-
- private inner class Consumer(cpu: ProcessingUnit) : FlowSource {
- private val offset = this@SimTraceWorkload.offset
- private val id = cpu.id
- private val coreCount = cpu.node.coreCount
-
- override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
- val fragment = pullFragment(now)
-
- if (fragment == null) {
- conn.close()
- return Long.MAX_VALUE
- }
-
- val timestamp = fragment.timestamp + offset
-
- // Fragment is in the future
- if (timestamp > now) {
- conn.push(0.0)
- return timestamp - now
- }
-
- val cores = min(coreCount, fragment.cores)
- val usage = if (fragment.cores > 0)
- fragment.usage / cores
- else
- 0.0
- val deadline = timestamp + fragment.duration
- val duration = deadline - now
-
- conn.push(if (id < cores && usage > 0.0) usage else 0.0)
-
- return duration
- }
- }
-
- /**
- * A fragment of the workload.
- *
- * @param timestamp The timestamp at which the fragment starts.
- * @param duration The duration of the fragment.
- * @param usage The CPU usage during the fragment.
- * @param cores The amount of cores utilized during the fragment.
- */
- public data class Fragment(
- @JvmField val timestamp: Long,
- @JvmField val duration: Long,
- @JvmField val usage: Double,
- @JvmField val cores: Int
- )
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
index 9db2e6ec..6f32cf46 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt
@@ -38,6 +38,8 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
+import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.SimTraceFragment
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -66,11 +68,11 @@ internal class SimFairShareHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
@@ -106,20 +108,20 @@ internal class SimFairShareHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3100.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
@@ -201,20 +203,20 @@ internal class SimFairShareHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
val workloadB =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 3100.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1)
)
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
index b05ffd22..02d308ff 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt
@@ -36,9 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.simulator.compute.workload.SimRuntimeWorkload
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.*
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
@@ -66,11 +64,11 @@ internal class SimSpaceSharedHypervisorTest {
val duration = 5 * 60L
val workloadA =
SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1),
- SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1),
- SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1),
- SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, duration * 1000, 28.0, 1),
+ SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1),
+ SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1),
+ SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1)
),
)
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
index cdbffe4b..574860e8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt
@@ -58,11 +58,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 2),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 0
)
@@ -85,11 +85,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 2),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 1000
)
@@ -112,11 +112,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 2),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 2),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 0
)
@@ -140,11 +140,11 @@ class SimTraceWorkloadTest {
)
val workload = SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2),
- SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2),
- SimTraceWorkload.Fragment(2000, 1000, 0.0, 0),
- SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2)
+ SimTrace.ofFragments(
+ SimTraceFragment(0, 1000, 2 * 28.0, 2),
+ SimTraceFragment(1000, 1000, 2 * 3100.0, 2),
+ SimTraceFragment(2000, 1000, 0.0, 0),
+ SimTraceFragment(3000, 1000, 2 * 73.0, 2)
),
offset = 0
)
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt
index e2f7874c..908e902a 100644
--- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt
+++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt
@@ -37,11 +37,6 @@ import kotlin.coroutines.CoroutineContext
@OptIn(InternalCoroutinesApi::class)
public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationController, Delay {
/**
- * The virtual clock of this dispatcher.
- */
- override val clock: Clock = VirtualClock()
-
- /**
* Queue of ordered tasks to run.
*/
private val queue = PriorityQueue<TimedRunnable>()
@@ -54,7 +49,12 @@ public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationCo
/**
* The current virtual time of simulation
*/
- private var _time = 0L
+ private var _clock = SimClock()
+
+ /**
+ * The virtual clock of this dispatcher.
+ */
+ override val clock: Clock = ClockAdapter(_clock)
override fun dispatch(context: CoroutineContext, block: Runnable) {
block.run()
@@ -79,14 +79,14 @@ public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationCo
}
override fun toString(): String {
- return "SimulationCoroutineDispatcher[time=${_time}ms, queued=${queue.size}]"
+ return "SimulationCoroutineDispatcher[time=${_clock.time}ms, queued=${queue.size}]"
}
private fun post(block: Runnable) =
queue.add(TimedRunnable(block, _counter++))
private fun postDelayed(block: Runnable, delayTime: Long) =
- TimedRunnable(block, _counter++, safePlus(_time, delayTime))
+ TimedRunnable(block, _counter++, safePlus(_clock.time, delayTime))
.also {
queue.add(it)
}
@@ -100,31 +100,41 @@ public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationCo
override fun advanceUntilIdle(): Long {
val queue = queue
- val oldTime = _time
- while (queue.isNotEmpty()) {
- val current = queue.poll()
+ val clock = _clock
+ val oldTime = clock.time
+
+ while (true) {
+ val current = queue.poll() ?: break
// If the scheduled time is 0 (immediate) use current virtual time
if (current.time != 0L) {
- _time = current.time
+ clock.time = current.time
}
current.run()
}
- return _time - oldTime
+ return clock.time - oldTime
}
- private inner class VirtualClock(private val zone: ZoneId = ZoneId.systemDefault()) : Clock() {
+ /**
+ * A helper class that holds the time of the simulation.
+ */
+ private class SimClock(@JvmField var time: Long = 0)
+
+ /**
+ * A helper class to expose a [Clock] instance for this dispatcher.
+ */
+ private class ClockAdapter(private val clock: SimClock, private val zone: ZoneId = ZoneId.systemDefault()) : Clock() {
override fun getZone(): ZoneId = zone
- override fun withZone(zone: ZoneId): Clock = VirtualClock(zone)
+ override fun withZone(zone: ZoneId): Clock = ClockAdapter(clock, zone)
override fun instant(): Instant = Instant.ofEpochMilli(millis())
- override fun millis(): Long = _time
+ override fun millis(): Long = clock.time
- override fun toString(): String = "SimulationCoroutineDispatcher.VirtualClock[time=$_time]"
+ override fun toString(): String = "SimulationCoroutineDispatcher.ClockAdapter[time=${clock.time}]"
}
/**
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
deleted file mode 100644
index b02426e3..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.FlowCountersImpl
-
-/**
- * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations.
- */
-public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer {
- /**
- * A flag to indicate that the flow consumer is active.
- */
- public override val isActive: Boolean
- get() = ctx != null
-
- /**
- * The capacity of the consumer.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- ctx?.capacity = value
- }
-
- /**
- * The current processing rate of the consumer.
- */
- public override val rate: Double
- get() = ctx?.rate ?: 0.0
-
- /**
- * The flow processing rate demand at this instant.
- */
- public override val demand: Double
- get() = ctx?.demand ?: 0.0
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public override val counters: FlowCounters
- get() = _counters
- private val _counters = FlowCountersImpl()
-
- /**
- * The [FlowConsumerContext] that is currently running.
- */
- protected var ctx: FlowConsumerContext? = null
- private set
-
- /**
- * Construct the [FlowConsumerLogic] instance for a new source.
- */
- protected abstract fun createLogic(): FlowConsumerLogic
-
- /**
- * Start the specified [FlowConsumerContext].
- */
- protected open fun start(ctx: FlowConsumerContext) {
- ctx.start()
- }
-
- /**
- * The previous demand for the consumer.
- */
- private var _previousDemand = 0.0
- private var _previousCapacity = 0.0
-
- /**
- * Update the counters of the flow consumer.
- */
- protected fun updateCounters(ctx: FlowConnection, delta: Long) {
- val demand = _previousDemand
- val capacity = _previousCapacity
-
- _previousDemand = ctx.demand
- _previousCapacity = ctx.capacity
-
- if (delta <= 0) {
- return
- }
-
- val counters = _counters
- val deltaS = delta / 1000.0
- val total = demand * deltaS
- val work = capacity * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.demand += work
- counters.actual += actualWork
- counters.remaining += (total - actualWork)
- }
-
- /**
- * Update the counters of the flow consumer.
- */
- protected fun updateCounters(demand: Double, actual: Double, remaining: Double) {
- val counters = _counters
- counters.demand += demand
- counters.actual += actual
- counters.remaining += remaining
- }
-
- final override fun startConsumer(source: FlowSource) {
- check(ctx == null) { "Consumer is in invalid state" }
- val ctx = engine.newContext(source, createLogic())
-
- ctx.capacity = capacity
- this.ctx = ctx
-
- start(ctx)
- }
-
- final override fun pull() {
- ctx?.pull()
- }
-
- final override fun cancel() {
- val ctx = ctx
- if (ctx != null) {
- this.ctx = null
- ctx.close()
- }
- }
-
- override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]"
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
index c327e1e9..8ff0bc76 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -52,6 +52,13 @@ public interface FlowConnection : AutoCloseable {
public fun pull()
/**
+ * Pull the source.
+ *
+ * @param now The timestamp at which the connection is pulled.
+ */
+ public fun pull(now: Long)
+
+ /**
* Push the given flow [rate] over this connection.
*
* @param rate The rate of the flow to push.
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
index d7182497..98922ab3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -55,6 +55,8 @@ public interface FlowConsumerContext : FlowConnection {
/**
* Synchronously pull the source of the connection.
+ *
+ * @param now The timestamp at which the connection is pulled.
*/
- public fun pullSync()
+ public fun pullSync(now: Long)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
index 7eaaf6c2..e3bdd7ba 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -23,7 +23,8 @@
package org.opendc.simulator.flow
import mu.KotlinLogging
-import org.opendc.simulator.flow.internal.FlowCountersImpl
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
/**
@@ -71,6 +72,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
_innerCtx?.pull()
}
+ override fun pull(now: Long) {
+ _innerCtx?.pull(now)
+ }
+
@JvmField var lastPull = Long.MAX_VALUE
override fun push(rate: Double) {
@@ -117,7 +122,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
override val counters: FlowCounters
get() = _counters
- private val _counters = FlowCountersImpl()
+ private val _counters = MutableFlowCounters()
override fun startConsumer(source: FlowSource) {
check(delegate == null) { "Forwarder already active" }
@@ -241,12 +246,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled
}
val counters = _counters
- val deltaS = delta / 1000.0
+ val deltaS = delta * D_MS_TO_S
val total = ctx.capacity * deltaS
val work = _demand * deltaS
val actualWork = ctx.rate * deltaS
- counters.demand += work
- counters.actual += actualWork
- counters.remaining += (total - actualWork)
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index b4eb6a38..e9094443 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -22,6 +22,9 @@
package org.opendc.simulator.flow
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
+
/**
* A [FlowSink] represents a sink with a fixed capacity.
*
@@ -33,38 +36,120 @@ public class FlowSink(
private val engine: FlowEngine,
initialCapacity: Double,
private val parent: FlowConvergenceListener? = null
-) : AbstractFlowConsumer(engine, initialCapacity) {
+) : FlowConsumer {
+ /**
+ * A flag to indicate that the flow consumer is active.
+ */
+ public override val isActive: Boolean
+ get() = _ctx != null
+
+ /**
+ * The capacity of the consumer.
+ */
+ public override var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ _ctx?.capacity = value
+ }
+
+ /**
+ * The current processing rate of the consumer.
+ */
+ public override val rate: Double
+ get() = _ctx?.rate ?: 0.0
+
+ /**
+ * The flow processing rate demand at this instant.
+ */
+ public override val demand: Double
+ get() = _ctx?.demand ?: 0.0
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
+
+ /**
+ * The current active [FlowConsumerLogic] of this sink.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ override fun startConsumer(source: FlowSource) {
+ check(_ctx == null) { "Consumer is in invalid state" }
- override fun start(ctx: FlowConsumerContext) {
+ val ctx = engine.newContext(source, Logic(parent, _counters))
+ _ctx = ctx
+
+ ctx.capacity = capacity
if (parent != null) {
ctx.shouldConsumerConverge = true
}
- super.start(ctx)
+
+ ctx.start()
}
- override fun createLogic(): FlowConsumerLogic {
- return object : FlowConsumerLogic {
- private val parent = this@FlowSink.parent
-
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- delta: Long,
- rate: Double
- ) {
- updateCounters(ctx, delta)
- }
+ override fun pull() {
+ _ctx?.pull()
+ }
- override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- updateCounters(ctx, delta)
- cancel()
- }
+ override fun cancel() {
+ _ctx?.close()
+ }
+
+ override fun toString(): String = "FlowSink[capacity=$capacity]"
+
+ /**
+ * [FlowConsumerLogic] of a sink.
+ */
+ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ updateCounters(ctx, delta, rate, ctx.capacity)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
+ updateCounters(ctx, delta, 0.0, 0.0)
+
+ _ctx = null
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now, delta)
+ }
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- parent?.onConverge(now, delta)
+ /**
+ * The previous demand and capacity for the consumer.
+ */
+ private val _previous = DoubleArray(2)
+
+ /**
+ * Update the counters of the flow consumer.
+ */
+ private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) {
+ val counters = counters
+ val previous = _previous
+ val demand = previous[0]
+ val capacity = previous[1]
+
+ previous[0] = nextDemand
+ previous[1] = nextCapacity
+
+ if (delta <= 0) {
+ return
}
+
+ val deltaS = delta * D_MS_TO_S
+ val total = demand * deltaS
+ val work = capacity * deltaS
+ val actualWork = ctx.rate * deltaS
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
}
}
-
- override fun toString(): String = "FlowSink[capacity=$capacity]"
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
new file mode 100644
index 00000000..450195ec
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Constant for converting milliseconds into seconds.
+ */
+internal const val D_MS_TO_S = 1 / 1000.0
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 9a568897..58ca918b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -134,8 +134,8 @@ internal class FlowConsumerContextImpl(
/**
* The timers at which the context is scheduled to be interrupted.
*/
- private var _timer: FlowEngineImpl.Timer? = null
- private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5)
+ private var _timer: Long = Long.MAX_VALUE
+ private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5)
override fun start() {
check(_flags and ConnState == ConnPending) { "Consumer is already started" }
@@ -164,17 +164,21 @@ internal class FlowConsumerContextImpl(
}
}
- override fun pull() {
+ override fun pull(now: Long) {
val flags = _flags
if (flags and ConnState != ConnActive) {
return
}
// Mark connection as pulled
- scheduleImmediate(_clock.millis(), flags or ConnPulled)
+ scheduleImmediate(now, flags or ConnPulled)
}
- override fun pullSync() {
+ override fun pull() {
+ pull(_clock.millis())
+ }
+
+ override fun pullSync(now: Long) {
val flags = _flags
// Do not attempt to flush the connection if the connection is closed or an update is already active
@@ -182,8 +186,6 @@ internal class FlowConsumerContextImpl(
return
}
- val now = _clock.millis()
-
if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
engine.scheduleSync(now, this)
}
@@ -217,8 +219,8 @@ internal class FlowConsumerContextImpl(
*/
fun doUpdate(
now: Long,
- visited: ArrayDeque<FlowConsumerContextImpl>,
- timerQueue: PriorityQueue<FlowEngineImpl.Timer>,
+ visited: FlowDeque,
+ timerQueue: FlowTimerQueue,
isImmediate: Boolean
) {
var flags = _flags
@@ -326,8 +328,7 @@ internal class FlowConsumerContextImpl(
// Prune the head timer if this is a delayed update
val timer = if (!isImmediate) {
// Invariant: Any pending timer should only point to a future timestamp
- // See also `scheduleDelayed`
- val timer = pendingTimers.poll()
+ val timer = pendingTimers.poll() ?: Long.MAX_VALUE
_timer = timer
timer
} else {
@@ -342,7 +343,7 @@ internal class FlowConsumerContextImpl(
if (newDeadline == Long.MAX_VALUE ||
flags and ConnState != ConnActive ||
flags and ConnDisableTimers != 0 ||
- (timer != null && newDeadline >= timer.target)
+ (timer != Long.MAX_VALUE && newDeadline >= timer)
) {
// Ignore any deadline scheduled at the maximum value
// This indicates that the source does not want to register a timer
@@ -350,12 +351,11 @@ internal class FlowConsumerContextImpl(
}
// Construct a timer for the new deadline and add it to the global queue of timers
- val newTimer = FlowEngineImpl.Timer(this, newDeadline)
- _timer = newTimer
- timerQueue.add(newTimer)
+ _timer = newDeadline
+ timerQueue.add(this, newDeadline)
- // A timer already exists for this connection, so add it to the queue of pending timers
- if (timer != null) {
+ // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers
+ if (timer != Long.MAX_VALUE) {
pendingTimers.addFirst(timer)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
new file mode 100644
index 00000000..c6cba4b7
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import java.util.*
+
+/**
+ * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations.
+ */
+internal class FlowDeque(initialCapacity: Int = 256) {
+ /**
+ * The array of elements in the queue.
+ */
+ private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity)
+ private var _head = 0
+ private var _tail = 0
+
+ /**
+ * Determine whether this queue is not empty.
+ */
+ fun isNotEmpty(): Boolean {
+ return _head != _tail
+ }
+
+ /**
+ * Add the specified [ctx] to the queue.
+ */
+ fun add(ctx: FlowConsumerContextImpl) {
+ val es = _elements
+ var tail = _tail
+
+ es[tail] = ctx
+
+ tail = inc(tail, es.size)
+ _tail = tail
+
+ if (_head == tail) {
+ doubleCapacity()
+ }
+ }
+
+ /**
+ * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty.
+ */
+ fun poll(): FlowConsumerContextImpl? {
+ val es = _elements
+ val head = _head
+ val ctx = es[head]
+
+ if (ctx != null) {
+ es[head] = null
+ _head = inc(head, es.size)
+ }
+
+ return ctx
+ }
+
+ /**
+ * Clear the queue.
+ */
+ fun clear() {
+ _elements.fill(null)
+ _head = 0
+ _tail = 0
+ }
+
+ private fun inc(i: Int, modulus: Int): Int {
+ var x = i
+ if (++x >= modulus) {
+ x = 0
+ }
+ return x
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private fun doubleCapacity() {
+ assert(_head == _tail)
+ val p = _head
+ val n = _elements.size
+ val r = n - p // number of elements to the right of p
+
+ val newCapacity = n shl 1
+ check(newCapacity >= 0) { "Sorry, deque too big" }
+
+ val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity)
+
+ _elements.copyInto(a, 0, p, r)
+ _elements.copyInto(a, r, 0, p)
+
+ _elements = a
+ _head = 0
+ _tail = n
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index a9234abf..3c79d54e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
-internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable {
+internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable {
/**
* The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
@@ -48,12 +48,12 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/**
* The queue of connection updates that are scheduled for immediate execution.
*/
- private val queue = ArrayDeque<FlowConsumerContextImpl>()
+ private val queue = FlowDeque()
/**
* A priority queue containing the connection updates to be scheduled in the future.
*/
- private val futureQueue = PriorityQueue<Timer>()
+ private val futureQueue = FlowTimerQueue()
/**
* The stack of engine invocations to occur in the future.
@@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/**
* The systems that have been visited during the engine cycle.
*/
- private val visited: ArrayDeque<FlowConsumerContextImpl> = ArrayDeque()
+ private val visited = FlowDeque()
/**
* The index in the batch stack.
@@ -71,6 +71,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
private var batchIndex = 0
/**
+ * The virtual [Clock] of this engine.
+ */
+ override val clock: Clock
+ get() = _clock
+ private val _clock: Clock = clock
+
+ /**
* Update the specified [ctx] synchronously.
*/
fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
@@ -113,7 +120,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
try {
// Flush the work if the engine is not already running
if (batchIndex == 1 && queue.isNotEmpty()) {
- doRunEngine(clock.millis())
+ doRunEngine(_clock.millis())
}
} finally {
batchIndex--
@@ -122,11 +129,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/* Runnable */
override fun run() {
- val now = clock.millis()
val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
- assert(now >= invocation.timestamp) { "Future invocations invariant violated" }
-
- doRunEngine(now)
+ doRunEngine(invocation.timestamp)
}
/**
@@ -144,17 +148,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
// Execute all scheduled updates at current timestamp
while (true) {
- val timer = futureQueue.peek() ?: break
- val target = timer.target
-
- if (target > now) {
- break
- }
-
- assert(target >= now) { "Internal inconsistency: found update of the past" }
-
- futureQueue.poll()
- timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
+ val ctx = futureQueue.poll(now) ?: break
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
}
// Repeat execution of all immediate updates until the system has converged to a steady-state
@@ -177,9 +172,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
}
// Schedule an engine invocation for the next update to occur.
- val headTimer = futureQueue.peek()
- if (headTimer != null) {
- trySchedule(now, futureInvocations, headTimer.target)
+ val headDeadline = futureQueue.peekDeadline()
+ if (headDeadline != Long.MAX_VALUE) {
+ trySchedule(now, futureInvocations, headDeadline)
}
}
@@ -217,17 +212,4 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
*/
fun cancel() = handle.dispose()
}
-
- /**
- * An update call for [ctx] that is scheduled for [target].
- *
- * This class represents an update in the future at [target] requested by [ctx].
- */
- class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> {
- override fun compareTo(other: Timer): Int {
- return target.compareTo(other.target)
- }
-
- override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]"
- }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
new file mode 100644
index 00000000..22a390e6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Specialized priority queue for flow timers.
+ */
+internal class FlowTimerQueue(initialCapacity: Int = 256) {
+ /**
+ * The binary heap of deadlines.
+ */
+ private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE }
+
+ /**
+ * The binary heap of [FlowConsumerContextImpl]s.
+ */
+ private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity)
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private var size = 0
+
+ /**
+ * Register a timer for [ctx] with [deadline].
+ */
+ fun add(ctx: FlowConsumerContextImpl, deadline: Long) {
+ val i = size
+ val deadlines = _deadlines
+ if (i >= deadlines.size) {
+ grow()
+ }
+
+ siftUp(deadlines, _pending, i, ctx, deadline)
+
+ size = i + 1
+ }
+
+ /**
+ * Update all pending [FlowConsumerContextImpl]s at the timestamp [now].
+ */
+ fun poll(now: Long): FlowConsumerContextImpl? {
+ if (size == 0) {
+ return null
+ }
+
+ val deadlines = _deadlines
+ val deadline = deadlines[0]
+
+ if (now < deadline) {
+ return null
+ }
+
+ val pending = _pending
+ val res = pending[0]
+ val s = --size
+
+ val nextDeadline = deadlines[s]
+ val next = pending[s]!!
+
+ // Clear the last element of the queue
+ pending[s] = null
+ deadlines[s] = Long.MIN_VALUE
+
+ if (s != 0) {
+ siftDown(deadlines, pending, next, nextDeadline)
+ }
+
+ return res
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ */
+ fun peekDeadline(): Long {
+ return if (size == 0) Long.MAX_VALUE else _deadlines[0]
+ }
+
+ /**
+ * Increases the capacity of the array.
+ */
+ private fun grow() {
+ val oldCapacity = _deadlines.size
+ // Double size if small; else grow by 50%
+ val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1
+
+ _deadlines = _deadlines.copyOf(newCapacity)
+ _pending = _pending.copyOf(newCapacity)
+ }
+
+ /**
+ * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is
+ * greater than or equal to its parent, or is the root.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param pos The position to fill.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftUp(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ pos: Int,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = pos
+
+ while (k > 0) {
+ val parent = (k - 1) ushr 1
+ val parentDeadline = deadlines[parent]
+
+ if (deadline >= parentDeadline) {
+ break
+ }
+
+ deadlines[k] = parentDeadline
+ pending[k] = pending[parent]
+
+ k = parent
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+
+ /**
+ * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it
+ * is less than or equal to its children or is a leaf.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftDown(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = 0
+ val size = size
+ val half = size ushr 1
+
+ while (k < half) {
+ var child = (k shl 1) + 1
+
+ var childDeadline = deadlines[child]
+ val right = child + 1
+
+ if (right < size) {
+ val rightDeadline = deadlines[right]
+
+ if (childDeadline > rightDeadline) {
+ child = right
+ childDeadline = rightDeadline
+ }
+ }
+
+ if (deadline <= childDeadline) {
+ break
+ }
+
+ deadlines[k] = childDeadline
+ pending[k] = pending[child]
+
+ k = child
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
index d2fa5228..d990dc61 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
@@ -27,17 +27,27 @@ import org.opendc.simulator.flow.FlowCounters
/**
* Mutable implementation of the [FlowCounters] interface.
*/
-internal class FlowCountersImpl : FlowCounters {
- override var demand: Double = 0.0
- override var actual: Double = 0.0
- override var remaining: Double = 0.0
- override var interference: Double = 0.0
+public class MutableFlowCounters : FlowCounters {
+ override val demand: Double
+ get() = _counters[0]
+ override val actual: Double
+ get() = _counters[1]
+ override val remaining: Double
+ get() = _counters[2]
+ override val interference: Double
+ get() = _counters[3]
+ private val _counters = DoubleArray(4)
override fun reset() {
- demand = 0.0
- actual = 0.0
- remaining = 0.0
- interference = 0.0
+ _counters.fill(0.0)
+ }
+
+ public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) {
+ val counters = _counters
+ counters[0] += demand
+ counters[1] += actual
+ counters[2] += remaining
+ counters[3] += interference
}
override fun toString(): String {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 97059e93..a0fb8a4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -25,7 +25,8 @@ package org.opendc.simulator.flow.mux
import org.opendc.simulator.flow.*
import org.opendc.simulator.flow.interference.InterferenceDomain
import org.opendc.simulator.flow.interference.InterferenceKey
-import org.opendc.simulator.flow.internal.FlowCountersImpl
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
import kotlin.math.max
import kotlin.math.min
@@ -85,7 +86,7 @@ public class MaxMinFlowMultiplexer(
private val scheduler = Scheduler(engine, parent)
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val provider = Input(engine, scheduler, interferenceDomain, key)
+ val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity)
_inputs.add(provider)
return provider
}
@@ -135,11 +136,11 @@ public class MaxMinFlowMultiplexer(
/**
* Helper class containing the scheduler state.
*/
- private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) {
+ private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) {
/**
* The flow counters of this scheduler.
*/
- @JvmField val counters = FlowCountersImpl()
+ @JvmField val counters = MutableFlowCounters()
/**
* The flow rate of the multiplexer.
@@ -167,6 +168,11 @@ public class MaxMinFlowMultiplexer(
private val _activeInputs = mutableListOf<Input>()
/**
+ * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList].
+ */
+ private var _inputArray = emptyArray<Input>()
+
+ /**
* The active outputs registered with the scheduler.
*/
private val _activeOutputs = mutableListOf<Output>()
@@ -184,10 +190,16 @@ public class MaxMinFlowMultiplexer(
private var _lastConvergeInput: Input? = null
/**
+ * The simulation clock.
+ */
+ private val _clock = engine.clock
+
+ /**
* Register the specified [input] to this scheduler.
*/
fun registerInput(input: Input) {
_activeInputs.add(input)
+ _inputArray = _activeInputs.toTypedArray()
val hasActivationOutput = activationOutput != null
@@ -195,7 +207,8 @@ public class MaxMinFlowMultiplexer(
input.shouldConsumerConverge = !hasActivationOutput
input.enableTimers = !hasActivationOutput
input.capacity = capacity
- trigger(engine.clock.millis())
+
+ trigger(_clock.millis())
}
/**
@@ -207,6 +220,8 @@ public class MaxMinFlowMultiplexer(
_lastConvergeInput = null
}
+ _activeInputs.remove(input)
+
// Re-run scheduler to distribute new load
trigger(now)
}
@@ -287,7 +302,7 @@ public class MaxMinFlowMultiplexer(
// a few inputs and little changes at the same timestamp.
// We always pick for option (1) unless there are no outputs available.
if (activationOutput != null) {
- activationOutput.pull()
+ activationOutput.pull(now)
return
} else {
runScheduler(now)
@@ -305,7 +320,7 @@ public class MaxMinFlowMultiplexer(
return try {
_schedulerActive = true
- doRunScheduler(delta)
+ doRunScheduler(now, delta)
} finally {
_schedulerActive = false
}
@@ -356,15 +371,17 @@ public class MaxMinFlowMultiplexer(
*
* @return The deadline after which a new scheduling cycle should start.
*/
- private fun doRunScheduler(delta: Long): Long {
+ private fun doRunScheduler(now: Long, delta: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
+ var inputArray = _inputArray
+ var inputSize = _inputArray.size
// Update the counters of the scheduler
updateCounters(delta)
// If there is no work yet, mark the inputs as idle.
- if (activeInputs.isEmpty()) {
+ if (inputSize == 0) {
demand = 0.0
rate = 0.0
return Long.MAX_VALUE
@@ -372,53 +389,70 @@ public class MaxMinFlowMultiplexer(
val capacity = capacity
var availableCapacity = capacity
+ var deadline = Long.MAX_VALUE
+ var demand = 0.0
+ var shouldRebuild = false
- // Pull in the work of the outputs
- val inputIterator = activeInputs.listIterator()
- for (input in inputIterator) {
- input.pullSync()
+ // Pull in the work of the inputs
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
- // Remove outputs that have finished
+ input.pullSync(now)
+
+ // Remove inputs that have finished
if (!input.isActive) {
input.actualRate = 0.0
- inputIterator.remove()
+ shouldRebuild = true
+ } else {
+ demand += input.limit
+ deadline = min(deadline, input.deadline)
}
}
- var demand = 0.0
- var deadline = Long.MAX_VALUE
+ // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs`
+ if (shouldRebuild) {
+ inputArray = activeInputs.toTypedArray()
+ inputSize = inputArray.size
+ _inputArray = inputArray
+ }
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- activeInputs.sort()
+ val rate = if (demand > capacity) {
+ // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
+ // constrained capacity across the inputs.
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- val size = activeInputs.size
- for (i in activeInputs.indices) {
- val input = activeInputs[i]
- val availableShare = availableCapacity / (size - i)
- val grantedRate = min(input.allowedRate, availableShare)
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ inputArray.sort()
- demand += input.limit
- deadline = min(deadline, input.deadline)
- availableCapacity -= grantedRate
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
+ val availableShare = availableCapacity / (inputSize - i)
+ val grantedRate = min(input.allowedRate, availableShare)
- input.actualRate = grantedRate
- }
+ availableCapacity -= grantedRate
+ input.actualRate = grantedRate
+ }
- val rate = capacity - availableCapacity
+ capacity - availableCapacity
+ } else {
+ demand
+ }
this.demand = demand
- this.rate = rate
-
- // Divide the requests over the available capacity of the input resources fairly
- for (i in activeOutputs.indices) {
- val output = activeOutputs[i]
- val inputCapacity = output.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
-
- output.push(grantedSpeed)
+ if (this.rate != rate) {
+ // Only update the outputs if the output rate has changed
+ this.rate = rate
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
}
return deadline
@@ -440,11 +474,16 @@ public class MaxMinFlowMultiplexer(
return
}
- val deltaS = delta / 1000.0
+ val deltaS = delta * D_MS_TO_S
+ val demand = demand
+ val rate = rate
- counters.demand += demand * deltaS
- counters.actual += rate * deltaS
- counters.remaining += (previousCapacity - rate) * deltaS
+ counters.increment(
+ demand = demand * deltaS,
+ actual = rate * deltaS,
+ remaining = (previousCapacity - rate) * deltaS,
+ interference = 0.0
+ )
}
}
@@ -452,32 +491,48 @@ public class MaxMinFlowMultiplexer(
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private class Input(
- engine: FlowEngine,
+ private val engine: FlowEngine,
private val scheduler: Scheduler,
private val interferenceDomain: InterferenceDomain?,
- @JvmField val key: InterferenceKey?
- ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> {
+ @JvmField val key: InterferenceKey?,
+ initialCapacity: Double,
+ ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
/**
- * The requested limit.
+ * A flag to indicate that the consumer is active.
*/
- @JvmField var limit: Double = 0.0
+ override val isActive: Boolean
+ get() = _ctx != null
/**
- * The actual processing speed.
+ * The demand of the consumer.
*/
- @JvmField var actualRate: Double = 0.0
+ override val demand: Double
+ get() = limit
/**
- * The deadline of the input.
+ * The processing rate of the consumer.
*/
- val deadline: Long
- get() = ctx?.deadline ?: Long.MAX_VALUE
+ override val rate: Double
+ get() = actualRate
/**
- * The processing rate that is allowed by the model constraints.
+ * The capacity of the input.
*/
- val allowedRate: Double
- get() = min(capacity, limit)
+ override var capacity: Double
+ get() = _capacity
+ set(value) {
+ allowedRate = min(limit, value)
+ _capacity = value
+ _ctx?.capacity = value
+ }
+ private var _capacity = initialCapacity
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
/**
* A flag to enable timers for the input.
@@ -485,7 +540,7 @@ public class MaxMinFlowMultiplexer(
var enableTimers: Boolean = true
set(value) {
field = value
- ctx?.enableTimers = value
+ _ctx?.enableTimers = value
}
/**
@@ -494,10 +549,36 @@ public class MaxMinFlowMultiplexer(
var shouldConsumerConverge: Boolean = true
set(value) {
field = value
- ctx?.shouldConsumerConverge = value
+ _ctx?.shouldConsumerConverge = value
}
/**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing rate that is allowed by the model constraints.
+ */
+ @JvmField var allowedRate: Double = 0.0
+
+ /**
+ * The deadline of the input.
+ */
+ val deadline: Long
+ get() = _ctx?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * The [FlowConsumerContext] that is currently running.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ /**
* A flag to indicate that the input is closed.
*/
private var _isClosed: Boolean = false
@@ -512,13 +593,33 @@ public class MaxMinFlowMultiplexer(
cancel()
}
- /* AbstractFlowConsumer */
- override fun createLogic(): FlowConsumerLogic = this
+ /**
+ * Pull the source if necessary.
+ */
+ fun pullSync(now: Long) {
+ _ctx?.pullSync(now)
+ }
- override fun start(ctx: FlowConsumerContext) {
+ /* FlowConsumer */
+ override fun startConsumer(source: FlowSource) {
check(!_isClosed) { "Cannot re-use closed input" }
+ check(_ctx == null) { "Consumer is in invalid state" }
+
+ val ctx = engine.newContext(source, this)
+ _ctx = ctx
+
+ ctx.capacity = capacity
scheduler.registerInput(this)
- super.start(ctx)
+
+ ctx.start()
+ }
+
+ override fun pull() {
+ _ctx?.pull()
+ }
+
+ override fun cancel() {
+ _ctx?.close()
}
/* FlowConsumerLogic */
@@ -530,8 +631,10 @@ public class MaxMinFlowMultiplexer(
) {
doUpdateCounters(delta)
- actualRate = 0.0
+ val allowed = min(rate, capacity)
limit = rate
+ actualRate = allowed
+ allowedRate = allowed
scheduler.trigger(now)
}
@@ -541,11 +644,11 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
+ allowedRate = 0.0
scheduler.deregisterInput(this, now)
- // BUG: Cancel the connection so that `ctx` is set to `null`
- cancel()
+ _ctx = null
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
@@ -556,13 +659,6 @@ public class MaxMinFlowMultiplexer(
override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
/**
- * Pull the source if necessary.
- */
- fun pullSync() {
- ctx?.pullSync()
- }
-
- /**
* Helper method to update the flow counters of the multiplexer.
*/
private fun doUpdateCounters(delta: Long) {
@@ -578,14 +674,16 @@ public class MaxMinFlowMultiplexer(
1.0
}
- val deltaS = delta / 1000.0
+ val actualRate = actualRate
+
+ val deltaS = delta * D_MS_TO_S
val demand = limit * deltaS
val actual = actualRate * deltaS
- val remaining = (capacity - actualRate) * deltaS
+ val remaining = (_capacity - actualRate) * deltaS
+ val interference = actual * max(0.0, 1 - perfScore)
- updateCounters(demand, actual, remaining)
-
- scheduler.counters.interference += actual * max(0.0, 1 - perfScore)
+ _counters.increment(demand, actual, remaining, interference)
+ scheduler.counters.increment(0.0, 0.0, 0.0, interference)
}
}
@@ -636,8 +734,8 @@ public class MaxMinFlowMultiplexer(
/**
* Pull this output.
*/
- fun pull() {
- _conn?.pull()
+ fun pull(now: Long) {
+ _conn?.pull(now)
}
override fun onStart(conn: FlowConnection, now: Long) {
@@ -675,6 +773,7 @@ public class MaxMinFlowMultiplexer(
// Output is not the activation output, so trigger activation output and do not install timer for this
// output (by returning `Long.MAX_VALUE`)
scheduler.trigger(now)
+
Long.MAX_VALUE
}
}