From 94fa3d33d4ef77aca5e70cc7f91ae9dca71d25e7 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 6 Oct 2021 13:12:18 +0200 Subject: perf(simulator): Optimize SimTraceWorkload This change improves the performance of the SimTraceWorkload class by changing the way trace fragments are read and processed by the CPU consumers. --- .../org/opendc/compute/simulator/SimHostTest.kt | 32 +-- .../compute/workload/ComputeWorkloadLoader.kt | 61 ++++-- .../org/opendc/compute/workload/VirtualMachine.kt | 6 +- .../serverless/trace/FunctionTraceWorkload.kt | 5 +- .../simulator/compute/SimMachineBenchmarks.kt | 13 +- .../opendc/simulator/compute/workload/SimTrace.kt | 233 +++++++++++++++++++++ .../simulator/compute/workload/SimTraceFragment.kt | 38 ++++ .../simulator/compute/workload/SimTraceWorkload.kt | 83 +------- .../compute/kernel/SimFairShareHypervisorTest.kt | 52 ++--- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 14 +- .../compute/workload/SimTraceWorkloadTest.kt | 40 ++-- 11 files changed, 405 insertions(+), 172 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 26089b6d..a0ff9228 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -39,6 +39,8 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -105,11 +107,11 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 183.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) ), offset = 1 ) @@ -121,11 +123,11 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2) ), offset = 1 ) @@ -217,11 +219,11 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 2000L, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) ), offset = 1 ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 7c579e39..1a6624f7 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,13 +23,14 @@ package org.opendc.compute.workload import mu.KotlinLogging -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimTrace import org.opendc.trace.* import java.io.File import java.time.Duration import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap +import kotlin.math.max import kotlin.math.roundToLong /** @@ -51,7 +52,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Read the fragments into memory. */ - private fun parseFragments(trace: Trace): Map> { + private fun parseFragments(trace: Trace): Map { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val idCol = reader.resolve(RESOURCE_ID) @@ -60,7 +61,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val coresCol = reader.resolve(RESOURCE_CPU_COUNT) val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) - val fragments = mutableMapOf>() + val fragments = mutableMapOf() return try { while (reader.nextRow()) { @@ -70,14 +71,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val fragment = SimTraceWorkload.Fragment( - time.toEpochMilli(), - duration.toMillis(), - cpuUsage, - cores - ) - - fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment) + val timeMs = time.toEpochMilli() + val deadlineMs = timeMs + duration.toMillis() + val builder = fragments.computeIfAbsent(id) { Builder() } + builder.add(timeMs, deadlineMs, cpuUsage, cores) } fragments @@ -89,7 +86,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(trace: Trace, fragments: Map>): List { + private fun parseMeta(trace: Trace, fragments: Map): List { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() val idCol = reader.resolve(RESOURCE_ID) @@ -115,8 +112,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs + val builder = fragments.getValue(id) + val totalLoad = builder.totalLoad entries.add( VirtualMachine( @@ -127,7 +124,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { totalLoad, submissionTime, endTime, - vmFragments + builder.build() ) ) } @@ -165,4 +162,38 @@ public class ComputeWorkloadLoader(private val baseDir: File) { public fun reset() { cache.clear() } + + /** + * A builder for a VM trace. + */ + private class Builder { + /** + * The total load of the trace. + */ + @JvmField var totalLoad: Double = 0.0 + + /** + * The internal builder for the trace. + */ + private val builder = SimTrace.builder() + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + val duration = max(0, deadline - timestamp) + totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs + builder.add(timestamp, deadline, usage, cores) + } + + /** + * Build the trace. + */ + fun build(): SimTrace = builder.build() + } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 40484b68..5dd239f6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -22,7 +22,7 @@ package org.opendc.compute.workload -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimTrace import java.time.Instant import java.util.* @@ -35,7 +35,7 @@ import java.util.* * @param memCapacity The provisioned memory for the VM. * @param startTime The start time of the VM. * @param stopTime The stop time of the VM. - * @param trace The trace fragments that belong to this VM. + * @param trace The trace that belong to this VM. */ public data class VirtualMachine( val uid: UUID, @@ -45,5 +45,5 @@ public data class VirtualMachine( val totalLoad: Double, val startTime: Instant, val stopTime: Instant, - val trace: Sequence, + val trace: SimTrace, ) diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt index a119a219..bbe130e3 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt @@ -23,12 +23,15 @@ package org.opendc.experiments.serverless.trace import org.opendc.faas.simulator.workload.SimFaaSWorkload +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload /** * A [SimFaaSWorkload] for a [FunctionTrace]. */ -public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.timestamp, it.duration, it.cpuUsage, 1) }) { +class FunctionTraceWorkload(trace: FunctionTrace) : + SimFaaSWorkload, SimWorkload by SimTraceWorkload(SimTrace.ofFragments(trace.samples.map { SimTraceFragment(it.timestamp, it.duration, it.cpuUsage, 1) })) { override suspend fun invoke() {} } diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index b8e0227a..cb52d24f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import javafx.application.Application.launch import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -34,6 +33,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var machineModel: MachineModel - private lateinit var trace: Sequence + private lateinit var trace: SimTrace @Setup fun setUp() { @@ -60,8 +60,13 @@ class SimMachineBenchmarks { ) val random = ThreadLocalRandom.current() - val entries = List(10000) { SimTraceWorkload.Fragment(it * 1000L, 1000, random.nextDouble(0.0, 4500.0), 1) } - trace = entries.asSequence() + val builder = SimTrace.builder() + repeat(10000) { + val timestamp = it.toLong() + val deadline = timestamp + 1000 + builder.add(timestamp, deadline, random.nextDouble(0.0, 4500.0), 1) + } + trace = builder.build() } @Benchmark diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt new file mode 100644 index 00000000..4f567b55 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource +import kotlin.math.min + +/** + * A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s. + * + * @param usageCol The column containing the CPU usage of each fragment (in MHz). + * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis). + * @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis). + * @param coresCol The column containing the utilized cores. + * @param size The number of fragments in the trace. + */ +public class SimTrace( + private val usageCol: DoubleArray, + private val timestampCol: LongArray, + private val deadlineCol: LongArray, + private val coresCol: IntArray, + private val size: Int, +) { + init { + require(size >= 0) { "Invalid trace size" } + require(usageCol.size >= size) { "Invalid number of usage entries" } + require(timestampCol.size >= size) { "Invalid number of timestamp entries" } + require(deadlineCol.size >= size) { "Invalid number of deadline entries" } + require(coresCol.size >= size) { "Invalid number of core entries" } + } + + public companion object { + /** + * Construct a [SimTrace] with the specified fragments. + */ + public fun ofFragments(fragments: List): SimTrace { + val size = fragments.size + val usageCol = DoubleArray(size) + val timestampCol = LongArray(size) + val deadlineCol = LongArray(size) + val coresCol = IntArray(size) + + for (i in fragments.indices) { + val fragment = fragments[i] + usageCol[i] = fragment.usage + timestampCol[i] = fragment.timestamp + deadlineCol[i] = fragment.timestamp + fragment.duration + coresCol[i] = fragment.cores + } + + return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + } + + /** + * Construct a [SimTrace] with the specified fragments. + */ + @JvmStatic + public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace { + val size = fragments.size + val usageCol = DoubleArray(size) + val timestampCol = LongArray(size) + val deadlineCol = LongArray(size) + val coresCol = IntArray(size) + + for (i in fragments.indices) { + val fragment = fragments[i] + usageCol[i] = fragment.usage + timestampCol[i] = fragment.timestamp + deadlineCol[i] = fragment.timestamp + fragment.duration + coresCol[i] = fragment.cores + } + + return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + } + + /** + * Create a [SimTrace.Builder] instance. + */ + @JvmStatic + public fun builder(): Builder = Builder() + } + + /** + * Construct a new [FlowSource] for the specified [cpu]. + */ + public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource { + return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size) + } + + /** + * A builder class for a [SimTrace]. + */ + public class Builder internal constructor() { + /** + * The columns of the trace. + */ + private var usageCol: DoubleArray = DoubleArray(16) + private var timestampCol: LongArray = LongArray(16) + private var deadlineCol: LongArray = LongArray(16) + private var coresCol: IntArray = IntArray(16) + + /** + * The number of entries in the trace. + */ + private var size = 0 + + /** + * Add the specified [SimTraceFragment] to the trace. + */ + public fun add(fragment: SimTraceFragment) { + add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores) + } + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + val size = size + + if (size == usageCol.size) { + grow() + } + + timestampCol[size] = timestamp + deadlineCol[size] = deadline + usageCol[size] = usage + coresCol[size] = cores + + this.size++ + } + + /** + * Helper function to grow the capacity of the column arrays. + */ + private fun grow() { + val arraySize = usageCol.size + val newSize = arraySize * 2 + + usageCol = usageCol.copyOf(newSize) + timestampCol = timestampCol.copyOf(newSize) + deadlineCol = deadlineCol.copyOf(newSize) + coresCol = coresCol.copyOf(newSize) + } + + /** + * Construct the immutable [SimTrace]. + */ + public fun build(): SimTrace { + return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + } + } + + /** + * A CPU consumer for the trace workload. + */ + private class CpuConsumer( + cpu: ProcessingUnit, + private val offset: Long, + private val usageCol: DoubleArray, + private val timestampCol: LongArray, + private val deadlineCol: LongArray, + private val coresCol: IntArray, + private val size: Int + ) : FlowSource { + private val id = cpu.id + private val coreCount = cpu.node.coreCount + + /** + * The index in the trace. + */ + private var _idx = 0 + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val size = size + val nowOffset = now - offset + + var idx = _idx + val deadlines = deadlineCol + var deadline = deadlines[idx] + + while (deadline <= nowOffset && ++idx < size) { + deadline = deadlines[idx] + } + + if (idx >= size) { + conn.close() + return Long.MAX_VALUE + } + + _idx = idx + val timestamp = timestampCol[idx] + + // Fragment is in the future + if (timestamp > nowOffset) { + conn.push(0.0) + return timestamp - nowOffset + } + + val cores = min(coreCount, coresCol[idx]) + val usage = usageCol[idx] + + conn.push(if (id < cores) usage / cores else 0.0) + return deadline - nowOffset + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt new file mode 100644 index 00000000..5285847f --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +/** + * A fragment of the workload trace. + * + * @param timestamp The timestamp at which the fragment starts (in epoch millis). + * @param duration The duration of the fragment (in milliseconds). + * @param usage The CPU usage during the fragment (in MHz). + * @param cores The amount of cores utilized during the fragment. + */ +public data class SimTraceFragment( + @JvmField val timestamp: Long, + @JvmField val duration: Long, + @JvmField val usage: Double, + @JvmField val cores: Int +) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 49ae5933..53c98409 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -23,10 +23,6 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource -import kotlin.math.min /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -35,89 +31,14 @@ import kotlin.math.min * @param trace The trace of fragments to use. * @param offset The offset for the timestamps. */ -public class SimTraceWorkload(public val trace: Sequence, private val offset: Long = 0L) : SimWorkload { - private val iterator = trace.iterator() - private var fragment: Fragment? = null - +public class SimTraceWorkload(private val trace: SimTrace, private val offset: Long = 0L) : SimWorkload { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { - cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model))) + cpu.startConsumer(lifecycle.waitFor(trace.newSource(cpu.model, offset))) } } override fun toString(): String = "SimTraceWorkload" - - /** - * Obtain the fragment with a timestamp equal or greater than [now]. - */ - private fun pullFragment(now: Long): Fragment? { - // Return the most recent fragment if its starting time + duration is later than `now` - var fragment = fragment - if (fragment != null && fragment.timestamp + offset + fragment.duration > now) { - return fragment - } - - while (iterator.hasNext()) { - fragment = iterator.next() - if (fragment.timestamp + offset + fragment.duration > now) { - this.fragment = fragment - return fragment - } - } - - this.fragment = null - return null - } - - private inner class Consumer(cpu: ProcessingUnit) : FlowSource { - private val offset = this@SimTraceWorkload.offset - private val id = cpu.id - private val coreCount = cpu.node.coreCount - - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - val fragment = pullFragment(now) - - if (fragment == null) { - conn.close() - return Long.MAX_VALUE - } - - val timestamp = fragment.timestamp + offset - - // Fragment is in the future - if (timestamp > now) { - conn.push(0.0) - return timestamp - now - } - - val cores = min(coreCount, fragment.cores) - val usage = if (fragment.cores > 0) - fragment.usage / cores - else - 0.0 - val deadline = timestamp + fragment.duration - val duration = deadline - now - - conn.push(if (id < cores && usage > 0.0) usage else 0.0) - - return duration - } - } - - /** - * A fragment of the workload. - * - * @param timestamp The timestamp at which the fragment starts. - * @param duration The duration of the fragment. - * @param usage The CPU usage during the fragment. - * @param cores The amount of cores utilized during the fragment. - */ - public data class Fragment( - @JvmField val timestamp: Long, - @JvmField val duration: Long, - @JvmField val usage: Double, - @JvmField val cores: Int - ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 9db2e6ec..6f32cf46 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -38,6 +38,8 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -66,11 +68,11 @@ internal class SimFairShareHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) @@ -106,20 +108,20 @@ internal class SimFairShareHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3100.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1) ) ) @@ -201,20 +203,20 @@ internal class SimFairShareHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 3100.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1) ) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index b05ffd22..02d308ff 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -36,9 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimRuntimeWorkload -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -66,11 +64,11 @@ internal class SimSpaceSharedHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index cdbffe4b..574860e8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -58,11 +58,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 2), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 0 ) @@ -85,11 +85,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 2), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 1000 ) @@ -112,11 +112,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 2), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 0 ) @@ -140,11 +140,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 0), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 0), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 0 ) -- cgit v1.2.3 From a0340a8752c4c4ed8413944b1dfb81b9481b6556 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 6 Oct 2021 14:29:31 +0200 Subject: perf(simulator): Skip fair-share algorithm if capacity remaining This change updates the MaxMinFlowMultiplexer implementation to skip the fair-share algorithm in case the total demand is lower than the available capacity. In this case, no re-division of capacity is necessary. --- .../experiments/capelin/CapelinIntegrationTest.kt | 2 +- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 60 ++++++++++++++-------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 337d68bf..e34c5bdc 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -213,7 +213,7 @@ class CapelinIntegrationTest { { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(481270, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 97059e93..9131eb54 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -372,6 +372,8 @@ public class MaxMinFlowMultiplexer( val capacity = capacity var availableCapacity = capacity + var deadline = Long.MAX_VALUE + var demand = 0.0 // Pull in the work of the outputs val inputIterator = activeInputs.listIterator() @@ -382,32 +384,36 @@ public class MaxMinFlowMultiplexer( if (!input.isActive) { input.actualRate = 0.0 inputIterator.remove() + } else { + demand += input.limit + deadline = min(deadline, input.deadline) } } - var demand = 0.0 - var deadline = Long.MAX_VALUE + val rate = if (demand > capacity) { + // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the + // constrained capacity across the inputs. - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() - // Divide the available output capacity fairly over the inputs using max-min fair sharing - val size = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / (size - i) - val grantedRate = min(input.allowedRate, availableShare) + // Divide the available output capacity fairly over the inputs using max-min fair sharing + val size = activeInputs.size + for (i in activeInputs.indices) { + val input = activeInputs[i] + val availableShare = availableCapacity / (size - i) + val grantedRate = min(input.allowedRate, availableShare) - demand += input.limit - deadline = min(deadline, input.deadline) - availableCapacity -= grantedRate + availableCapacity -= grantedRate + input.actualRate = grantedRate + } - input.actualRate = grantedRate + capacity - availableCapacity + } else { + demand } - val rate = capacity - availableCapacity - this.demand = demand this.rate = rate @@ -467,6 +473,11 @@ public class MaxMinFlowMultiplexer( */ @JvmField var actualRate: Double = 0.0 + /** + * The processing rate that is allowed by the model constraints. + */ + @JvmField var allowedRate: Double = 0.0 + /** * The deadline of the input. */ @@ -474,10 +485,14 @@ public class MaxMinFlowMultiplexer( get() = ctx?.deadline ?: Long.MAX_VALUE /** - * The processing rate that is allowed by the model constraints. + * The capacity of the input. */ - val allowedRate: Double - get() = min(capacity, limit) + override var capacity: Double + get() = super.capacity + set(value) { + allowedRate = min(limit, value) + super.capacity = value + } /** * A flag to enable timers for the input. @@ -530,8 +545,10 @@ public class MaxMinFlowMultiplexer( ) { doUpdateCounters(delta) - actualRate = 0.0 + val allowed = min(rate, capacity) limit = rate + actualRate = allowed + allowedRate = allowed scheduler.trigger(now) } @@ -541,6 +558,7 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 + allowedRate = 0.0 scheduler.deregisterInput(this, now) -- cgit v1.2.3 From 774ed886ac8f84ae2974c1204534ee332d920864 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 Oct 2021 14:39:03 +0200 Subject: fix(simulator): Count interference for multiplexer inputs This change updates the SimAbstractHypervisor and MaxMinFlowMultiplexer to count interference of multiplexer inputs, instead of only counting them for the scheduler as a whole. --- .../compute/kernel/SimAbstractHypervisor.kt | 90 +++++++------ .../opendc/simulator/flow/AbstractFlowConsumer.kt | 26 +--- .../org/opendc/simulator/flow/FlowForwarder.kt | 9 +- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 12 +- .../simulator/flow/internal/FlowCountersImpl.kt | 46 ------- .../simulator/flow/internal/MutableFlowCounters.kt | 56 ++++++++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 147 ++++++++++++++------- 7 files changed, 225 insertions(+), 161 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index aac8b959..f6d8f628 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -66,45 +66,7 @@ public abstract class SimAbstractHypervisor( */ public override val counters: SimHypervisorCounters get() = _counters - private val _counters = object : SimHypervisorCounters { - @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity - - override var cpuActiveTime: Long = 0L - override var cpuIdleTime: Long = 0L - override var cpuStealTime: Long = 0L - override var cpuLostTime: Long = 0L - - private var _previousDemand = 0.0 - private var _previousActual = 0.0 - private var _previousRemaining = 0.0 - private var _previousInterference = 0.0 - - /** - * Record the CPU time of the hypervisor. - */ - fun record() { - val counters = mux.counters - val demand = counters.demand - val actual = counters.actual - val remaining = counters.remaining - val interference = counters.interference - - val demandDelta = demand - _previousDemand - val actualDelta = actual - _previousActual - val remainingDelta = remaining - _previousRemaining - val interferenceDelta = interference - _previousInterference - - _previousDemand = demand - _previousActual = actual - _previousRemaining = remaining - _previousInterference = interference - - cpuActiveTime += (actualDelta * d).roundToLong() - cpuIdleTime += (remainingDelta * d).roundToLong() - cpuStealTime += ((demandDelta - actualDelta) * d).roundToLong() - cpuLostTime += (interferenceDelta * d).roundToLong() - } - } + private val _counters = CountersImpl(this) /** * The CPU capacity of the hypervisor in MHz. @@ -204,7 +166,7 @@ public abstract class SimAbstractHypervisor( get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() override val cpuStealTime: Long get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() - override val cpuLostTime: Long = 0L + override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong() } /** @@ -277,4 +239,52 @@ public abstract class SimAbstractHypervisor( override val min: Double = 0.0 } + + /** + * Implementation of [SimHypervisorCounters]. + */ + private class CountersImpl(private val hv: SimAbstractHypervisor) : SimHypervisorCounters { + @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + private val _cpuTime = LongArray(4) + private val _previous = DoubleArray(4) + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val cpuTime = _cpuTime + val previous = _previous + val counters = hv.mux.counters + + val demand = counters.demand + val actual = counters.actual + val remaining = counters.remaining + val interference = counters.interference + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + val interferenceDelta = interference - previous[3] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + previous[3] = interference + + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + cpuTime[3] += (interferenceDelta * d).roundToLong() + } + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index b02426e3..5f1057e8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.flow -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters /** * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. @@ -55,13 +55,6 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi public override val demand: Double get() = ctx?.demand ?: 0.0 - /** - * The flow counters to track the flow metrics of the consumer. - */ - public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() - /** * The [FlowConsumerContext] that is currently running. */ @@ -89,7 +82,7 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * Update the counters of the flow consumer. */ - protected fun updateCounters(ctx: FlowConnection, delta: Long) { + protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) { val demand = _previousDemand val capacity = _previousCapacity @@ -100,25 +93,12 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi return } - val counters = _counters val deltaS = delta / 1000.0 val total = demand * deltaS val work = capacity * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) - } - - /** - * Update the counters of the flow consumer. - */ - protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { - val counters = _counters - counters.demand += demand - counters.actual += actual - counters.remaining += remaining + increment(work, actualWork, (total - actualWork), 0.0) } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 7eaaf6c2..229fd96a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max /** @@ -117,7 +117,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override val counters: FlowCounters get() = _counters - private val _counters = FlowCountersImpl() + private val _counters = MutableFlowCounters() override fun startConsumer(source: FlowSource) { check(delegate == null) { "Forwarder already active" } @@ -245,8 +245,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index b4eb6a38..170ab1c0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.MutableFlowCounters + /** * A [FlowSink] represents a sink with a fixed capacity. * @@ -34,6 +36,12 @@ public class FlowSink( initialCapacity: Double, private val parent: FlowConvergenceListener? = null ) : AbstractFlowConsumer(engine, initialCapacity) { + /** + * The flow counters to track the flow metrics of the consumer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() override fun start(ctx: FlowConsumerContext) { if (parent != null) { @@ -52,11 +60,11 @@ public class FlowSink( delta: Long, rate: Double ) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt deleted file mode 100644 index d2fa5228..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import org.opendc.simulator.flow.FlowCounters - -/** - * Mutable implementation of the [FlowCounters] interface. - */ -internal class FlowCountersImpl : FlowCounters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var remaining: Double = 0.0 - override var interference: Double = 0.0 - - override fun reset() { - demand = 0.0 - actual = 0.0 - remaining = 0.0 - interference = 0.0 - } - - override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt new file mode 100644 index 00000000..d990dc61 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import org.opendc.simulator.flow.FlowCounters + +/** + * Mutable implementation of the [FlowCounters] interface. + */ +public class MutableFlowCounters : FlowCounters { + override val demand: Double + get() = _counters[0] + override val actual: Double + get() = _counters[1] + override val remaining: Double + get() = _counters[2] + override val interference: Double + get() = _counters[3] + private val _counters = DoubleArray(4) + + override fun reset() { + _counters.fill(0.0) + } + + public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + val counters = _counters + counters[0] += demand + counters[1] += actual + counters[2] += remaining + counters[3] += interference + } + + override fun toString(): String { + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 9131eb54..eaa3f7c5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,7 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -85,7 +85,7 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key) + val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) _inputs.add(provider) return provider } @@ -135,11 +135,11 @@ public class MaxMinFlowMultiplexer( /** * Helper class containing the scheduler state. */ - private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) { /** * The flow counters of this scheduler. */ - @JvmField val counters = FlowCountersImpl() + @JvmField val counters = MutableFlowCounters() /** * The flow rate of the multiplexer. @@ -183,6 +183,11 @@ public class MaxMinFlowMultiplexer( private var _lastConverge: Long = Long.MIN_VALUE private var _lastConvergeInput: Input? = null + /** + * The simulation clock. + */ + private val _clock = engine.clock + /** * Register the specified [input] to this scheduler. */ @@ -195,7 +200,7 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity - trigger(engine.clock.millis()) + trigger(_clock.millis()) } /** @@ -447,10 +452,15 @@ public class MaxMinFlowMultiplexer( } val deltaS = delta / 1000.0 - - counters.demand += demand * deltaS - counters.actual += rate * deltaS - counters.remaining += (previousCapacity - rate) * deltaS + val demand = demand + val rate = rate + + counters.increment( + demand = demand * deltaS, + actual = rate * deltaS, + remaining = (previousCapacity - rate) * deltaS, + interference = 0.0 + ) } } @@ -458,41 +468,48 @@ public class MaxMinFlowMultiplexer( * An internal [FlowConsumer] implementation for multiplexer inputs. */ private class Input( - engine: FlowEngine, + private val engine: FlowEngine, private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey? - ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable { - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - + @JvmField val key: InterferenceKey?, + initialCapacity: Double, + ) : FlowConsumer, FlowConsumerLogic, Comparable { /** - * The actual processing speed. + * A flag to indicate that the consumer is active. */ - @JvmField var actualRate: Double = 0.0 + override val isActive: Boolean + get() = _ctx != null /** - * The processing rate that is allowed by the model constraints. + * The demand of the consumer. */ - @JvmField var allowedRate: Double = 0.0 + override val demand: Double + get() = limit /** - * The deadline of the input. + * The processing rate of the consumer. */ - val deadline: Long - get() = ctx?.deadline ?: Long.MAX_VALUE + override val rate: Double + get() = actualRate /** * The capacity of the input. */ override var capacity: Double - get() = super.capacity + get() = _capacity set(value) { allowedRate = min(limit, value) - super.capacity = value + _capacity = value + _ctx?.capacity = value } + private var _capacity = initialCapacity + + /** + * The flow counters to track the flow metrics of the consumer. + */ + override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() /** * A flag to enable timers for the input. @@ -500,7 +517,7 @@ public class MaxMinFlowMultiplexer( var enableTimers: Boolean = true set(value) { field = value - ctx?.enableTimers = value + _ctx?.enableTimers = value } /** @@ -509,9 +526,35 @@ public class MaxMinFlowMultiplexer( var shouldConsumerConverge: Boolean = true set(value) { field = value - ctx?.shouldConsumerConverge = value + _ctx?.shouldConsumerConverge = value } + /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + @JvmField var allowedRate: Double = 0.0 + + /** + * The deadline of the input. + */ + val deadline: Long + get() = _ctx?.deadline ?: Long.MAX_VALUE + + /** + * The [FlowConsumerContext] that is currently running. + */ + private var _ctx: FlowConsumerContext? = null + /** * A flag to indicate that the input is closed. */ @@ -527,13 +570,33 @@ public class MaxMinFlowMultiplexer( cancel() } - /* AbstractFlowConsumer */ - override fun createLogic(): FlowConsumerLogic = this + /** + * Pull the source if necessary. + */ + fun pullSync() { + _ctx?.pullSync() + } - override fun start(ctx: FlowConsumerContext) { + /* FlowConsumer */ + override fun startConsumer(source: FlowSource) { check(!_isClosed) { "Cannot re-use closed input" } + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, this) + _ctx = ctx + + ctx.capacity = capacity scheduler.registerInput(this) - super.start(ctx) + + ctx.start() + } + + override fun pull() { + _ctx?.pull() + } + + override fun cancel() { + _ctx?.close() } /* FlowConsumerLogic */ @@ -562,8 +625,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterInput(this, now) - // BUG: Cancel the connection so that `ctx` is set to `null` - cancel() + _ctx = null } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { @@ -573,13 +635,6 @@ public class MaxMinFlowMultiplexer( /* Comparable */ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) - /** - * Pull the source if necessary. - */ - fun pullSync() { - ctx?.pullSync() - } - /** * Helper method to update the flow counters of the multiplexer. */ @@ -596,14 +651,16 @@ public class MaxMinFlowMultiplexer( 1.0 } + val actualRate = actualRate + val deltaS = delta / 1000.0 val demand = limit * deltaS val actual = actualRate * deltaS - val remaining = (capacity - actualRate) * deltaS - - updateCounters(demand, actual, remaining) + val remaining = (_capacity - actualRate) * deltaS + val interference = actual * max(0.0, 1 - perfScore) - scheduler.counters.interference += actual * max(0.0, 1 - perfScore) + _counters.increment(demand, actual, remaining, interference) + scheduler.counters.increment(0.0, 0.0, 0.0, interference) } } -- cgit v1.2.3 From 54f83291aaff75ed875e507d8dbf9037d3e93710 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 10:58:43 +0200 Subject: refactor(simulator): Simplify FlowSink implementation This change simplifies the FlowSink implementation by not relying on the AbstractFlowConsumer, but instead implementing the FlowConsumer interface itself. --- .../opendc/simulator/flow/AbstractFlowConsumer.kt | 127 --------------------- .../org/opendc/simulator/flow/FlowForwarder.kt | 3 +- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 123 ++++++++++++++++---- .../opendc/simulator/flow/internal/Constants.kt | 28 +++++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 5 +- 5 files changed, 133 insertions(+), 153 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt deleted file mode 100644 index 5f1057e8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.MutableFlowCounters - -/** - * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. - */ -public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { - /** - * A flag to indicate that the flow consumer is active. - */ - public override val isActive: Boolean - get() = ctx != null - - /** - * The capacity of the consumer. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - - /** - * The current processing rate of the consumer. - */ - public override val rate: Double - get() = ctx?.rate ?: 0.0 - - /** - * The flow processing rate demand at this instant. - */ - public override val demand: Double - get() = ctx?.demand ?: 0.0 - - /** - * The [FlowConsumerContext] that is currently running. - */ - protected var ctx: FlowConsumerContext? = null - private set - - /** - * Construct the [FlowConsumerLogic] instance for a new source. - */ - protected abstract fun createLogic(): FlowConsumerLogic - - /** - * Start the specified [FlowConsumerContext]. - */ - protected open fun start(ctx: FlowConsumerContext) { - ctx.start() - } - - /** - * The previous demand for the consumer. - */ - private var _previousDemand = 0.0 - private var _previousCapacity = 0.0 - - /** - * Update the counters of the flow consumer. - */ - protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) { - val demand = _previousDemand - val capacity = _previousCapacity - - _previousDemand = ctx.demand - _previousCapacity = ctx.capacity - - if (delta <= 0) { - return - } - - val deltaS = delta / 1000.0 - val total = demand * deltaS - val work = capacity * deltaS - val actualWork = ctx.rate * deltaS - - increment(work, actualWork, (total - actualWork), 0.0) - } - - final override fun startConsumer(source: FlowSource) { - check(ctx == null) { "Consumer is in invalid state" } - val ctx = engine.newContext(source, createLogic()) - - ctx.capacity = capacity - this.ctx = ctx - - start(ctx) - } - - final override fun pull() { - ctx?.pull() - } - - final override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.close() - } - } - - override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 229fd96a..7230a966 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max @@ -241,7 +242,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } val counters = _counters - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index 170ab1c0..e9094443 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters /** @@ -35,7 +36,34 @@ public class FlowSink( private val engine: FlowEngine, initialCapacity: Double, private val parent: FlowConvergenceListener? = null -) : AbstractFlowConsumer(engine, initialCapacity) { +) : FlowConsumer { + /** + * A flag to indicate that the flow consumer is active. + */ + public override val isActive: Boolean + get() = _ctx != null + + /** + * The capacity of the consumer. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + _ctx?.capacity = value + } + + /** + * The current processing rate of the consumer. + */ + public override val rate: Double + get() = _ctx?.rate ?: 0.0 + + /** + * The flow processing rate demand at this instant. + */ + public override val demand: Double + get() = _ctx?.demand ?: 0.0 + /** * The flow counters to track the flow metrics of the consumer. */ @@ -43,36 +71,85 @@ public class FlowSink( get() = _counters private val _counters = MutableFlowCounters() - override fun start(ctx: FlowConsumerContext) { + /** + * The current active [FlowConsumerLogic] of this sink. + */ + private var _ctx: FlowConsumerContext? = null + + override fun startConsumer(source: FlowSource) { + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, Logic(parent, _counters)) + _ctx = ctx + + ctx.capacity = capacity if (parent != null) { ctx.shouldConsumerConverge = true } - super.start(ctx) + + ctx.start() } - override fun createLogic(): FlowConsumerLogic { - return object : FlowConsumerLogic { - private val parent = this@FlowSink.parent - - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - delta: Long, - rate: Double - ) { - _counters.update(ctx, delta) - } + override fun pull() { + _ctx?.pull() + } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - _counters.update(ctx, delta) - cancel() - } + override fun cancel() { + _ctx?.close() + } + + override fun toString(): String = "FlowSink[capacity=$capacity]" + + /** + * [FlowConsumerLogic] of a sink. + */ + private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + updateCounters(ctx, delta, rate, ctx.capacity) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { + updateCounters(ctx, delta, 0.0, 0.0) + + _ctx = null + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now, delta) + } + + /** + * The previous demand and capacity for the consumer. + */ + private val _previous = DoubleArray(2) + + /** + * Update the counters of the flow consumer. + */ + private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + val counters = counters + val previous = _previous + val demand = previous[0] + val capacity = previous[1] - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + previous[0] = nextDemand + previous[1] = nextCapacity + + if (delta <= 0) { + return } + + val deltaS = delta * D_MS_TO_S + val total = demand * deltaS + val work = capacity * deltaS + val actualWork = ctx.rate * deltaS + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } - - override fun toString(): String = "FlowSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt new file mode 100644 index 00000000..450195ec --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Constant for converting milliseconds into seconds. + */ +internal const val D_MS_TO_S = 1 / 1000.0 diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index eaa3f7c5..31fb5b73 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -451,7 +452,7 @@ public class MaxMinFlowMultiplexer( return } - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val demand = demand val rate = rate @@ -653,7 +654,7 @@ public class MaxMinFlowMultiplexer( val actualRate = actualRate - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val demand = limit * deltaS val actual = actualRate * deltaS val remaining = (_capacity - actualRate) * deltaS -- cgit v1.2.3 From 043ad5b2713164c76b09ba8cd07af9f0ca1f35e4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 11:16:49 +0200 Subject: perf(simulator): Do not update outputs if rate is unchanged --- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 31fb5b73..28743276 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -421,16 +421,19 @@ public class MaxMinFlowMultiplexer( } this.demand = demand - this.rate = rate - - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction - - output.push(grantedSpeed) + if (this.rate != rate) { + // Only update the outputs if the output rate has changed + this.rate = rate + + // Divide the requests over the available capacity of the input resources fairly + for (i in activeOutputs.indices) { + val output = activeOutputs[i] + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } } return deadline -- cgit v1.2.3 From 4623316cb23ce95cb8ce8db0987f948a8dc1a349 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 11:54:45 +0200 Subject: perf(simulator): Eliminate ArrayList iteration overhead This change eliminates the overhead caused by ArrayList iteration in the MaxMinFlowMultiplexer class. --- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 40 ++++++++++++++++------ 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 28743276..eab5b299 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -167,6 +167,11 @@ public class MaxMinFlowMultiplexer( */ private val _activeInputs = mutableListOf() + /** + * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList]. + */ + private var _inputArray = emptyArray() + /** * The active outputs registered with the scheduler. */ @@ -194,6 +199,7 @@ public class MaxMinFlowMultiplexer( */ fun registerInput(input: Input) { _activeInputs.add(input) + _inputArray = _activeInputs.toTypedArray() val hasActivationOutput = activationOutput != null @@ -201,6 +207,7 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity + trigger(_clock.millis()) } @@ -213,6 +220,8 @@ public class MaxMinFlowMultiplexer( _lastConvergeInput = null } + _activeInputs.remove(input) + // Re-run scheduler to distribute new load trigger(now) } @@ -365,12 +374,14 @@ public class MaxMinFlowMultiplexer( private fun doRunScheduler(delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + var inputArray = _inputArray + var inputSize = _inputArray.size // Update the counters of the scheduler updateCounters(delta) // If there is no work yet, mark the inputs as idle. - if (activeInputs.isEmpty()) { + if (inputSize == 0) { demand = 0.0 rate = 0.0 return Long.MAX_VALUE @@ -380,35 +391,42 @@ public class MaxMinFlowMultiplexer( var availableCapacity = capacity var deadline = Long.MAX_VALUE var demand = 0.0 + var shouldRebuild = false - // Pull in the work of the outputs - val inputIterator = activeInputs.listIterator() - for (input in inputIterator) { + // Pull in the work of the inputs + for (i in 0 until inputSize) { + val input = inputArray[i] input.pullSync() - // Remove outputs that have finished + // Remove inputs that have finished if (!input.isActive) { input.actualRate = 0.0 - inputIterator.remove() + shouldRebuild = true } else { demand += input.limit deadline = min(deadline, input.deadline) } } + // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs` + if (shouldRebuild) { + inputArray = activeInputs.toTypedArray() + inputSize = inputArray.size + _inputArray = inputArray + } + val rate = if (demand > capacity) { // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the // constrained capacity across the inputs. // Sort in-place the inputs based on their pushed flow. // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + inputArray.sort() // Divide the available output capacity fairly over the inputs using max-min fair sharing - val size = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / (size - i) + for (i in 0 until inputSize) { + val input = inputArray[i] + val availableShare = availableCapacity / (inputSize - i) val grantedRate = min(input.allowedRate, availableShare) availableCapacity -= grantedRate -- cgit v1.2.3 From f9483bc5782d86637777c0d21c383ce3e2c0851b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 12:23:48 +0200 Subject: perf(simulator): Optimize clock storage --- .../core/SimulationCoroutineDispatcher.kt | 44 +++++++++++++--------- .../simulator/flow/internal/FlowEngineImpl.kt | 13 +++++-- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt index e2f7874c..908e902a 100644 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt @@ -36,11 +36,6 @@ import kotlin.coroutines.CoroutineContext */ @OptIn(InternalCoroutinesApi::class) public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationController, Delay { - /** - * The virtual clock of this dispatcher. - */ - override val clock: Clock = VirtualClock() - /** * Queue of ordered tasks to run. */ @@ -54,7 +49,12 @@ public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationCo /** * The current virtual time of simulation */ - private var _time = 0L + private var _clock = SimClock() + + /** + * The virtual clock of this dispatcher. + */ + override val clock: Clock = ClockAdapter(_clock) override fun dispatch(context: CoroutineContext, block: Runnable) { block.run() @@ -79,14 +79,14 @@ public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationCo } override fun toString(): String { - return "SimulationCoroutineDispatcher[time=${_time}ms, queued=${queue.size}]" + return "SimulationCoroutineDispatcher[time=${_clock.time}ms, queued=${queue.size}]" } private fun post(block: Runnable) = queue.add(TimedRunnable(block, _counter++)) private fun postDelayed(block: Runnable, delayTime: Long) = - TimedRunnable(block, _counter++, safePlus(_time, delayTime)) + TimedRunnable(block, _counter++, safePlus(_clock.time, delayTime)) .also { queue.add(it) } @@ -100,31 +100,41 @@ public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationCo override fun advanceUntilIdle(): Long { val queue = queue - val oldTime = _time - while (queue.isNotEmpty()) { - val current = queue.poll() + val clock = _clock + val oldTime = clock.time + + while (true) { + val current = queue.poll() ?: break // If the scheduled time is 0 (immediate) use current virtual time if (current.time != 0L) { - _time = current.time + clock.time = current.time } current.run() } - return _time - oldTime + return clock.time - oldTime } - private inner class VirtualClock(private val zone: ZoneId = ZoneId.systemDefault()) : Clock() { + /** + * A helper class that holds the time of the simulation. + */ + private class SimClock(@JvmField var time: Long = 0) + + /** + * A helper class to expose a [Clock] instance for this dispatcher. + */ + private class ClockAdapter(private val clock: SimClock, private val zone: ZoneId = ZoneId.systemDefault()) : Clock() { override fun getZone(): ZoneId = zone - override fun withZone(zone: ZoneId): Clock = VirtualClock(zone) + override fun withZone(zone: ZoneId): Clock = ClockAdapter(clock, zone) override fun instant(): Instant = Instant.ofEpochMilli(millis()) - override fun millis(): Long = _time + override fun millis(): Long = clock.time - override fun toString(): String = "SimulationCoroutineDispatcher.VirtualClock[time=$_time]" + override fun toString(): String = "SimulationCoroutineDispatcher.ClockAdapter[time=${clock.time}]" } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index a9234abf..450556f8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable { +internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -70,6 +70,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va */ private var batchIndex = 0 + /** + * The virtual [Clock] of this engine. + */ + override val clock: Clock + get() = _clock + private val _clock: Clock = clock + /** * Update the specified [ctx] synchronously. */ @@ -113,7 +120,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va try { // Flush the work if the engine is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - doRunEngine(clock.millis()) + doRunEngine(_clock.millis()) } } finally { batchIndex-- @@ -122,7 +129,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /* Runnable */ override fun run() { - val now = clock.millis() + val now = _clock.millis() val invocation = futureInvocations.poll() // Clear invocation from future invocation queue assert(now >= invocation.timestamp) { "Future invocations invariant violated" } -- cgit v1.2.3 From 4433185388f843140aad096dfdd88dfe2398bf2b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 16:49:55 +0200 Subject: perf(simulator): Specialize FlowEngine queues This change specializes the queues used by the FlowEngine implementation in order to reduce the overhead caused by type-erasure of generics. --- .../flow/internal/FlowConsumerContextImpl.kt | 22 ++- .../opendc/simulator/flow/internal/FlowDeque.kt | 116 ++++++++++++ .../simulator/flow/internal/FlowEngineImpl.kt | 38 +--- .../simulator/flow/internal/FlowTimerQueue.kt | 195 +++++++++++++++++++++ 4 files changed, 329 insertions(+), 42 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9a568897..0baa7880 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -134,8 +134,8 @@ internal class FlowConsumerContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private var _timer: FlowEngineImpl.Timer? = null - private val _pendingTimers: ArrayDeque = ArrayDeque(5) + private var _timer: Long = Long.MAX_VALUE + private val _pendingTimers: ArrayDeque = ArrayDeque(5) override fun start() { check(_flags and ConnState == ConnPending) { "Consumer is already started" } @@ -217,8 +217,8 @@ internal class FlowConsumerContextImpl( */ fun doUpdate( now: Long, - visited: ArrayDeque, - timerQueue: PriorityQueue, + visited: FlowDeque, + timerQueue: FlowTimerQueue, isImmediate: Boolean ) { var flags = _flags @@ -326,8 +326,7 @@ internal class FlowConsumerContextImpl( // Prune the head timer if this is a delayed update val timer = if (!isImmediate) { // Invariant: Any pending timer should only point to a future timestamp - // See also `scheduleDelayed` - val timer = pendingTimers.poll() + val timer = pendingTimers.poll() ?: Long.MAX_VALUE _timer = timer timer } else { @@ -342,7 +341,7 @@ internal class FlowConsumerContextImpl( if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || flags and ConnDisableTimers != 0 || - (timer != null && newDeadline >= timer.target) + (timer != Long.MAX_VALUE && newDeadline >= timer) ) { // Ignore any deadline scheduled at the maximum value // This indicates that the source does not want to register a timer @@ -350,12 +349,11 @@ internal class FlowConsumerContextImpl( } // Construct a timer for the new deadline and add it to the global queue of timers - val newTimer = FlowEngineImpl.Timer(this, newDeadline) - _timer = newTimer - timerQueue.add(newTimer) + _timer = newDeadline + timerQueue.add(this, newDeadline) - // A timer already exists for this connection, so add it to the queue of pending timers - if (timer != null) { + // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers + if (timer != Long.MAX_VALUE) { pendingTimers.addFirst(timer) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt new file mode 100644 index 00000000..c6cba4b7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import java.util.* + +/** + * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations. + */ +internal class FlowDeque(initialCapacity: Int = 256) { + /** + * The array of elements in the queue. + */ + private var _elements: Array = arrayOfNulls(initialCapacity) + private var _head = 0 + private var _tail = 0 + + /** + * Determine whether this queue is not empty. + */ + fun isNotEmpty(): Boolean { + return _head != _tail + } + + /** + * Add the specified [ctx] to the queue. + */ + fun add(ctx: FlowConsumerContextImpl) { + val es = _elements + var tail = _tail + + es[tail] = ctx + + tail = inc(tail, es.size) + _tail = tail + + if (_head == tail) { + doubleCapacity() + } + } + + /** + * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty. + */ + fun poll(): FlowConsumerContextImpl? { + val es = _elements + val head = _head + val ctx = es[head] + + if (ctx != null) { + es[head] = null + _head = inc(head, es.size) + } + + return ctx + } + + /** + * Clear the queue. + */ + fun clear() { + _elements.fill(null) + _head = 0 + _tail = 0 + } + + private fun inc(i: Int, modulus: Int): Int { + var x = i + if (++x >= modulus) { + x = 0 + } + return x + } + + /** + * Doubles the capacity of this deque + */ + private fun doubleCapacity() { + assert(_head == _tail) + val p = _head + val n = _elements.size + val r = n - p // number of elements to the right of p + + val newCapacity = n shl 1 + check(newCapacity >= 0) { "Sorry, deque too big" } + + val a = arrayOfNulls(newCapacity) + + _elements.copyInto(a, 0, p, r) + _elements.copyInto(a, r, 0, p) + + _elements = a + _head = 0 + _tail = n + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 450556f8..55debef0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -48,12 +48,12 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /** * The queue of connection updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque() + private val queue = FlowDeque() /** * A priority queue containing the connection updates to be scheduled in the future. */ - private val futureQueue = PriorityQueue() + private val futureQueue = FlowTimerQueue() /** * The stack of engine invocations to occur in the future. @@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /** * The systems that have been visited during the engine cycle. */ - private val visited: ArrayDeque = ArrayDeque() + private val visited = FlowDeque() /** * The index in the batch stack. @@ -151,17 +151,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc // Execute all scheduled updates at current timestamp while (true) { - val timer = futureQueue.peek() ?: break - val target = timer.target - - if (target > now) { - break - } - - assert(target >= now) { "Internal inconsistency: found update of the past" } - - futureQueue.poll() - timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) + val ctx = futureQueue.poll(now) ?: break + ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -184,9 +175,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc } // Schedule an engine invocation for the next update to occur. - val headTimer = futureQueue.peek() - if (headTimer != null) { - trySchedule(now, futureInvocations, headTimer.target) + val headDeadline = futureQueue.peekDeadline() + if (headDeadline != Long.MAX_VALUE) { + trySchedule(now, futureInvocations, headDeadline) } } @@ -224,17 +215,4 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc */ fun cancel() = handle.dispose() } - - /** - * An update call for [ctx] that is scheduled for [target]. - * - * This class represents an update in the future at [target] requested by [ctx]. - */ - class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable { - override fun compareTo(other: Timer): Int { - return target.compareTo(other.target) - } - - override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]" - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt new file mode 100644 index 00000000..22a390e6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Specialized priority queue for flow timers. + */ +internal class FlowTimerQueue(initialCapacity: Int = 256) { + /** + * The binary heap of deadlines. + */ + private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE } + + /** + * The binary heap of [FlowConsumerContextImpl]s. + */ + private var _pending = arrayOfNulls(initialCapacity) + + /** + * The number of elements in the priority queue. + */ + private var size = 0 + + /** + * Register a timer for [ctx] with [deadline]. + */ + fun add(ctx: FlowConsumerContextImpl, deadline: Long) { + val i = size + val deadlines = _deadlines + if (i >= deadlines.size) { + grow() + } + + siftUp(deadlines, _pending, i, ctx, deadline) + + size = i + 1 + } + + /** + * Update all pending [FlowConsumerContextImpl]s at the timestamp [now]. + */ + fun poll(now: Long): FlowConsumerContextImpl? { + if (size == 0) { + return null + } + + val deadlines = _deadlines + val deadline = deadlines[0] + + if (now < deadline) { + return null + } + + val pending = _pending + val res = pending[0] + val s = --size + + val nextDeadline = deadlines[s] + val next = pending[s]!! + + // Clear the last element of the queue + pending[s] = null + deadlines[s] = Long.MIN_VALUE + + if (s != 0) { + siftDown(deadlines, pending, next, nextDeadline) + } + + return res + } + + /** + * Find the earliest deadline in the queue. + */ + fun peekDeadline(): Long { + return if (size == 0) Long.MAX_VALUE else _deadlines[0] + } + + /** + * Increases the capacity of the array. + */ + private fun grow() { + val oldCapacity = _deadlines.size + // Double size if small; else grow by 50% + val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1 + + _deadlines = _deadlines.copyOf(newCapacity) + _pending = _pending.copyOf(newCapacity) + } + + /** + * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is + * greater than or equal to its parent, or is the root. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param pos The position to fill. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftUp( + deadlines: LongArray, + pending: Array, + pos: Int, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = pos + + while (k > 0) { + val parent = (k - 1) ushr 1 + val parentDeadline = deadlines[parent] + + if (deadline >= parentDeadline) { + break + } + + deadlines[k] = parentDeadline + pending[k] = pending[parent] + + k = parent + } + + deadlines[k] = deadline + pending[k] = ctx + } + + /** + * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it + * is less than or equal to its children or is a leaf. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftDown( + deadlines: LongArray, + pending: Array, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = 0 + val size = size + val half = size ushr 1 + + while (k < half) { + var child = (k shl 1) + 1 + + var childDeadline = deadlines[child] + val right = child + 1 + + if (right < size) { + val rightDeadline = deadlines[right] + + if (childDeadline > rightDeadline) { + child = right + childDeadline = rightDeadline + } + } + + if (deadline <= childDeadline) { + break + } + + deadlines[k] = childDeadline + pending[k] = pending[child] + + k = child + } + + deadlines[k] = deadline + pending[k] = ctx + } +} -- cgit v1.2.3 From e2f002358e9d5be2239fa2cb7ca92c9c96a21b6f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 17:10:45 +0200 Subject: perf(simulator): Eliminate clock access in hot path This change eliminates the clock calls in the hot path, by passing the current timestamp directly as method parameter. --- .../kotlin/org/opendc/simulator/flow/FlowConnection.kt | 7 +++++++ .../org/opendc/simulator/flow/FlowConsumerContext.kt | 4 +++- .../kotlin/org/opendc/simulator/flow/FlowForwarder.kt | 4 ++++ .../simulator/flow/internal/FlowConsumerContextImpl.kt | 12 +++++++----- .../opendc/simulator/flow/internal/FlowEngineImpl.kt | 5 +---- .../opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt | 18 ++++++++++-------- 6 files changed, 32 insertions(+), 18 deletions(-) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index c327e1e9..8ff0bc76 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -51,6 +51,13 @@ public interface FlowConnection : AutoCloseable { */ public fun pull() + /** + * Pull the source. + * + * @param now The timestamp at which the connection is pulled. + */ + public fun pull(now: Long) + /** * Push the given flow [rate] over this connection. * diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index d7182497..98922ab3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -55,6 +55,8 @@ public interface FlowConsumerContext : FlowConnection { /** * Synchronously pull the source of the connection. + * + * @param now The timestamp at which the connection is pulled. */ - public fun pullSync() + public fun pullSync(now: Long) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 7230a966..e3bdd7ba 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -72,6 +72,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled _innerCtx?.pull() } + override fun pull(now: Long) { + _innerCtx?.pull(now) + } + @JvmField var lastPull = Long.MAX_VALUE override fun push(rate: Double) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 0baa7880..58ca918b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -164,17 +164,21 @@ internal class FlowConsumerContextImpl( } } - override fun pull() { + override fun pull(now: Long) { val flags = _flags if (flags and ConnState != ConnActive) { return } // Mark connection as pulled - scheduleImmediate(_clock.millis(), flags or ConnPulled) + scheduleImmediate(now, flags or ConnPulled) } - override fun pullSync() { + override fun pull() { + pull(_clock.millis()) + } + + override fun pullSync(now: Long) { val flags = _flags // Do not attempt to flush the connection if the connection is closed or an update is already active @@ -182,8 +186,6 @@ internal class FlowConsumerContextImpl( return } - val now = _clock.millis() - if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) { engine.scheduleSync(now, this) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 55debef0..3c79d54e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -129,11 +129,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /* Runnable */ override fun run() { - val now = _clock.millis() val invocation = futureInvocations.poll() // Clear invocation from future invocation queue - assert(now >= invocation.timestamp) { "Future invocations invariant violated" } - - doRunEngine(now) + doRunEngine(invocation.timestamp) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index eab5b299..a0fb8a4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -302,7 +302,7 @@ public class MaxMinFlowMultiplexer( // a few inputs and little changes at the same timestamp. // We always pick for option (1) unless there are no outputs available. if (activationOutput != null) { - activationOutput.pull() + activationOutput.pull(now) return } else { runScheduler(now) @@ -320,7 +320,7 @@ public class MaxMinFlowMultiplexer( return try { _schedulerActive = true - doRunScheduler(delta) + doRunScheduler(now, delta) } finally { _schedulerActive = false } @@ -371,7 +371,7 @@ public class MaxMinFlowMultiplexer( * * @return The deadline after which a new scheduling cycle should start. */ - private fun doRunScheduler(delta: Long): Long { + private fun doRunScheduler(now: Long, delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs var inputArray = _inputArray @@ -396,7 +396,8 @@ public class MaxMinFlowMultiplexer( // Pull in the work of the inputs for (i in 0 until inputSize) { val input = inputArray[i] - input.pullSync() + + input.pullSync(now) // Remove inputs that have finished if (!input.isActive) { @@ -595,8 +596,8 @@ public class MaxMinFlowMultiplexer( /** * Pull the source if necessary. */ - fun pullSync() { - _ctx?.pullSync() + fun pullSync(now: Long) { + _ctx?.pullSync(now) } /* FlowConsumer */ @@ -733,8 +734,8 @@ public class MaxMinFlowMultiplexer( /** * Pull this output. */ - fun pull() { - _conn?.pull() + fun pull(now: Long) { + _conn?.pull(now) } override fun onStart(conn: FlowConnection, now: Long) { @@ -772,6 +773,7 @@ public class MaxMinFlowMultiplexer( // Output is not the activation output, so trigger activation output and do not install timer for this // output (by returning `Long.MAX_VALUE`) scheduler.trigger(now) + Long.MAX_VALUE } } -- cgit v1.2.3