diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-06 13:12:18 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-08 17:11:52 +0200 |
| commit | 94fa3d33d4ef77aca5e70cc7f91ae9dca71d25e7 (patch) | |
| tree | 42ec6c1cc5fe027d6e0568986b9a81c03ab90838 | |
| parent | 3098eeb116a80ce12e6575e454d0448867478792 (diff) | |
perf(simulator): Optimize SimTraceWorkload
This change improves the performance of the SimTraceWorkload class by
changing the way trace fragments are read and processed by the CPU
consumers.
11 files changed, 405 insertions, 172 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 26089b6d..a0ff9228 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -39,6 +39,8 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -105,11 +107,11 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 183.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 183.0, 2) ), offset = 1 ) @@ -121,11 +123,11 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000, duration * 1000, 2 * 3100.0, 2), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000, duration * 1000, 2 * 73.0, 2) ), offset = 1 ) @@ -217,11 +219,11 @@ internal class SimHostTest { emptyMap(), mapOf( "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), - SimTraceWorkload.Fragment(duration * 2000L, duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 2 * 28.0, 2), + SimTraceFragment(duration * 1000L, duration * 1000, 2 * 3500.0, 2), + SimTraceFragment(duration * 2000L, duration * 1000, 0.0, 2), + SimTraceFragment(duration * 3000L, duration * 1000, 2 * 183.0, 2) ), offset = 1 ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 7c579e39..1a6624f7 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,13 +23,14 @@ package org.opendc.compute.workload import mu.KotlinLogging -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimTrace import org.opendc.trace.* import java.io.File import java.time.Duration import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap +import kotlin.math.max import kotlin.math.roundToLong /** @@ -51,7 +52,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Read the fragments into memory. */ - private fun parseFragments(trace: Trace): Map<String, List<SimTraceWorkload.Fragment>> { + private fun parseFragments(trace: Trace): Map<String, Builder> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val idCol = reader.resolve(RESOURCE_ID) @@ -60,7 +61,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val coresCol = reader.resolve(RESOURCE_CPU_COUNT) val usageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) - val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() + val fragments = mutableMapOf<String, Builder>() return try { while (reader.nextRow()) { @@ -70,14 +71,10 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val fragment = SimTraceWorkload.Fragment( - time.toEpochMilli(), - duration.toMillis(), - cpuUsage, - cores - ) - - fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment) + val timeMs = time.toEpochMilli() + val deadlineMs = timeMs + duration.toMillis() + val builder = fragments.computeIfAbsent(id) { Builder() } + builder.add(timeMs, deadlineMs, cpuUsage, cores) } fragments @@ -89,7 +86,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Read the metadata into a workload. */ - private fun parseMeta(trace: Trace, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<VirtualMachine> { + private fun parseMeta(trace: Trace, fragments: Map<String, Builder>): List<VirtualMachine> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() val idCol = reader.resolve(RESOURCE_ID) @@ -115,8 +112,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val requiredMemory = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - val vmFragments = fragments.getValue(id).asSequence() - val totalLoad = vmFragments.sumOf { (it.usage * it.duration) / 1000.0 } // avg MHz * duration = MFLOPs + val builder = fragments.getValue(id) + val totalLoad = builder.totalLoad entries.add( VirtualMachine( @@ -127,7 +124,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { totalLoad, submissionTime, endTime, - vmFragments + builder.build() ) ) } @@ -165,4 +162,38 @@ public class ComputeWorkloadLoader(private val baseDir: File) { public fun reset() { cache.clear() } + + /** + * A builder for a VM trace. + */ + private class Builder { + /** + * The total load of the trace. + */ + @JvmField var totalLoad: Double = 0.0 + + /** + * The internal builder for the trace. + */ + private val builder = SimTrace.builder() + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + val duration = max(0, deadline - timestamp) + totalLoad += (usage * duration) / 1000.0 // avg MHz * duration = MFLOPs + builder.add(timestamp, deadline, usage, cores) + } + + /** + * Build the trace. + */ + fun build(): SimTrace = builder.build() + } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt index 40484b68..5dd239f6 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt @@ -22,7 +22,7 @@ package org.opendc.compute.workload -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimTrace import java.time.Instant import java.util.* @@ -35,7 +35,7 @@ import java.util.* * @param memCapacity The provisioned memory for the VM. * @param startTime The start time of the VM. * @param stopTime The stop time of the VM. - * @param trace The trace fragments that belong to this VM. + * @param trace The trace that belong to this VM. */ public data class VirtualMachine( val uid: UUID, @@ -45,5 +45,5 @@ public data class VirtualMachine( val totalLoad: Double, val startTime: Instant, val stopTime: Instant, - val trace: Sequence<SimTraceWorkload.Fragment>, + val trace: SimTrace, ) diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt index a119a219..bbe130e3 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt @@ -23,12 +23,15 @@ package org.opendc.experiments.serverless.trace import org.opendc.faas.simulator.workload.SimFaaSWorkload +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload /** * A [SimFaaSWorkload] for a [FunctionTrace]. */ -public class FunctionTraceWorkload(trace: FunctionTrace) : SimFaaSWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.timestamp, it.duration, it.cpuUsage, 1) }) { +class FunctionTraceWorkload(trace: FunctionTrace) : + SimFaaSWorkload, SimWorkload by SimTraceWorkload(SimTrace.ofFragments(trace.samples.map { SimTraceFragment(it.timestamp, it.duration, it.cpuUsage, 1) })) { override suspend fun invoke() {} } diff --git a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt index b8e0227a..cb52d24f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-compute/src/jmh/kotlin/org/opendc/simulator/compute/SimMachineBenchmarks.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute -import javafx.application.Application.launch import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -34,6 +33,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit @OptIn(ExperimentalCoroutinesApi::class) class SimMachineBenchmarks { private lateinit var machineModel: MachineModel - private lateinit var trace: Sequence<SimTraceWorkload.Fragment> + private lateinit var trace: SimTrace @Setup fun setUp() { @@ -60,8 +60,13 @@ class SimMachineBenchmarks { ) val random = ThreadLocalRandom.current() - val entries = List(10000) { SimTraceWorkload.Fragment(it * 1000L, 1000, random.nextDouble(0.0, 4500.0), 1) } - trace = entries.asSequence() + val builder = SimTrace.builder() + repeat(10000) { + val timestamp = it.toLong() + val deadline = timestamp + 1000 + builder.add(timestamp, deadline, random.nextDouble(0.0, 4500.0), 1) + } + trace = builder.build() } @Benchmark diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt new file mode 100644 index 00000000..4f567b55 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTrace.kt @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource +import kotlin.math.min + +/** + * A workload trace that describes the resource utilization over time in a collection of [SimTraceFragment]s. + * + * @param usageCol The column containing the CPU usage of each fragment (in MHz). + * @param timestampCol The column containing the starting timestamp for each fragment (in epoch millis). + * @param deadlineCol The column containing the ending timestamp for each fragment (in epoch millis). + * @param coresCol The column containing the utilized cores. + * @param size The number of fragments in the trace. + */ +public class SimTrace( + private val usageCol: DoubleArray, + private val timestampCol: LongArray, + private val deadlineCol: LongArray, + private val coresCol: IntArray, + private val size: Int, +) { + init { + require(size >= 0) { "Invalid trace size" } + require(usageCol.size >= size) { "Invalid number of usage entries" } + require(timestampCol.size >= size) { "Invalid number of timestamp entries" } + require(deadlineCol.size >= size) { "Invalid number of deadline entries" } + require(coresCol.size >= size) { "Invalid number of core entries" } + } + + public companion object { + /** + * Construct a [SimTrace] with the specified fragments. + */ + public fun ofFragments(fragments: List<SimTraceFragment>): SimTrace { + val size = fragments.size + val usageCol = DoubleArray(size) + val timestampCol = LongArray(size) + val deadlineCol = LongArray(size) + val coresCol = IntArray(size) + + for (i in fragments.indices) { + val fragment = fragments[i] + usageCol[i] = fragment.usage + timestampCol[i] = fragment.timestamp + deadlineCol[i] = fragment.timestamp + fragment.duration + coresCol[i] = fragment.cores + } + + return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + } + + /** + * Construct a [SimTrace] with the specified fragments. + */ + @JvmStatic + public fun ofFragments(vararg fragments: SimTraceFragment): SimTrace { + val size = fragments.size + val usageCol = DoubleArray(size) + val timestampCol = LongArray(size) + val deadlineCol = LongArray(size) + val coresCol = IntArray(size) + + for (i in fragments.indices) { + val fragment = fragments[i] + usageCol[i] = fragment.usage + timestampCol[i] = fragment.timestamp + deadlineCol[i] = fragment.timestamp + fragment.duration + coresCol[i] = fragment.cores + } + + return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + } + + /** + * Create a [SimTrace.Builder] instance. + */ + @JvmStatic + public fun builder(): Builder = Builder() + } + + /** + * Construct a new [FlowSource] for the specified [cpu]. + */ + public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource { + return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size) + } + + /** + * A builder class for a [SimTrace]. + */ + public class Builder internal constructor() { + /** + * The columns of the trace. + */ + private var usageCol: DoubleArray = DoubleArray(16) + private var timestampCol: LongArray = LongArray(16) + private var deadlineCol: LongArray = LongArray(16) + private var coresCol: IntArray = IntArray(16) + + /** + * The number of entries in the trace. + */ + private var size = 0 + + /** + * Add the specified [SimTraceFragment] to the trace. + */ + public fun add(fragment: SimTraceFragment) { + add(fragment.timestamp, fragment.timestamp + fragment.duration, fragment.usage, fragment.cores) + } + + /** + * Add a fragment to the trace. + * + * @param timestamp Timestamp at which the fragment starts (in epoch millis). + * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param usage CPU usage of this fragment. + * @param cores Number of cores used. + */ + public fun add(timestamp: Long, deadline: Long, usage: Double, cores: Int) { + val size = size + + if (size == usageCol.size) { + grow() + } + + timestampCol[size] = timestamp + deadlineCol[size] = deadline + usageCol[size] = usage + coresCol[size] = cores + + this.size++ + } + + /** + * Helper function to grow the capacity of the column arrays. + */ + private fun grow() { + val arraySize = usageCol.size + val newSize = arraySize * 2 + + usageCol = usageCol.copyOf(newSize) + timestampCol = timestampCol.copyOf(newSize) + deadlineCol = deadlineCol.copyOf(newSize) + coresCol = coresCol.copyOf(newSize) + } + + /** + * Construct the immutable [SimTrace]. + */ + public fun build(): SimTrace { + return SimTrace(usageCol, timestampCol, deadlineCol, coresCol, size) + } + } + + /** + * A CPU consumer for the trace workload. + */ + private class CpuConsumer( + cpu: ProcessingUnit, + private val offset: Long, + private val usageCol: DoubleArray, + private val timestampCol: LongArray, + private val deadlineCol: LongArray, + private val coresCol: IntArray, + private val size: Int + ) : FlowSource { + private val id = cpu.id + private val coreCount = cpu.node.coreCount + + /** + * The index in the trace. + */ + private var _idx = 0 + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val size = size + val nowOffset = now - offset + + var idx = _idx + val deadlines = deadlineCol + var deadline = deadlines[idx] + + while (deadline <= nowOffset && ++idx < size) { + deadline = deadlines[idx] + } + + if (idx >= size) { + conn.close() + return Long.MAX_VALUE + } + + _idx = idx + val timestamp = timestampCol[idx] + + // Fragment is in the future + if (timestamp > nowOffset) { + conn.push(0.0) + return timestamp - nowOffset + } + + val cores = min(coreCount, coresCol[idx]) + val usage = usageCol[idx] + + conn.push(if (id < cores) usage / cores else 0.0) + return deadline - nowOffset + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt new file mode 100644 index 00000000..5285847f --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceFragment.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +/** + * A fragment of the workload trace. + * + * @param timestamp The timestamp at which the fragment starts (in epoch millis). + * @param duration The duration of the fragment (in milliseconds). + * @param usage The CPU usage during the fragment (in MHz). + * @param cores The amount of cores utilized during the fragment. + */ +public data class SimTraceFragment( + @JvmField val timestamp: Long, + @JvmField val duration: Long, + @JvmField val usage: Double, + @JvmField val cores: Int +) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 49ae5933..53c98409 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -23,10 +23,6 @@ package org.opendc.simulator.compute.workload import org.opendc.simulator.compute.SimMachineContext -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource -import kotlin.math.min /** * A [SimWorkload] that replays a workload trace consisting of multiple fragments, each indicating the resource @@ -35,89 +31,14 @@ import kotlin.math.min * @param trace The trace of fragments to use. * @param offset The offset for the timestamps. */ -public class SimTraceWorkload(public val trace: Sequence<Fragment>, private val offset: Long = 0L) : SimWorkload { - private val iterator = trace.iterator() - private var fragment: Fragment? = null - +public class SimTraceWorkload(private val trace: SimTrace, private val offset: Long = 0L) : SimWorkload { override fun onStart(ctx: SimMachineContext) { val lifecycle = SimWorkloadLifecycle(ctx) for (cpu in ctx.cpus) { - cpu.startConsumer(lifecycle.waitFor(Consumer(cpu.model))) + cpu.startConsumer(lifecycle.waitFor(trace.newSource(cpu.model, offset))) } } override fun toString(): String = "SimTraceWorkload" - - /** - * Obtain the fragment with a timestamp equal or greater than [now]. - */ - private fun pullFragment(now: Long): Fragment? { - // Return the most recent fragment if its starting time + duration is later than `now` - var fragment = fragment - if (fragment != null && fragment.timestamp + offset + fragment.duration > now) { - return fragment - } - - while (iterator.hasNext()) { - fragment = iterator.next() - if (fragment.timestamp + offset + fragment.duration > now) { - this.fragment = fragment - return fragment - } - } - - this.fragment = null - return null - } - - private inner class Consumer(cpu: ProcessingUnit) : FlowSource { - private val offset = this@SimTraceWorkload.offset - private val id = cpu.id - private val coreCount = cpu.node.coreCount - - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - val fragment = pullFragment(now) - - if (fragment == null) { - conn.close() - return Long.MAX_VALUE - } - - val timestamp = fragment.timestamp + offset - - // Fragment is in the future - if (timestamp > now) { - conn.push(0.0) - return timestamp - now - } - - val cores = min(coreCount, fragment.cores) - val usage = if (fragment.cores > 0) - fragment.usage / cores - else - 0.0 - val deadline = timestamp + fragment.duration - val duration = deadline - now - - conn.push(if (id < cores && usage > 0.0) usage else 0.0) - - return duration - } - } - - /** - * A fragment of the workload. - * - * @param timestamp The timestamp at which the fragment starts. - * @param duration The duration of the fragment. - * @param usage The CPU usage during the fragment. - * @param cores The amount of cores utilized during the fragment. - */ - public data class Fragment( - @JvmField val timestamp: Long, - @JvmField val duration: Long, - @JvmField val usage: Double, - @JvmField val cores: Int - ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 9db2e6ec..6f32cf46 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -38,6 +38,8 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.SimTraceFragment import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -66,11 +68,11 @@ internal class SimFairShareHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) @@ -106,20 +108,20 @@ internal class SimFairShareHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3100.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1) ) ) @@ -201,20 +203,20 @@ internal class SimFairShareHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) val workloadB = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 3100.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 73.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 3100.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 73.0, 1) ) ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index b05ffd22..02d308ff 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -36,9 +36,7 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimRuntimeWorkload -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine @@ -66,11 +64,11 @@ internal class SimSpaceSharedHypervisorTest { val duration = 5 * 60L val workloadA = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, duration * 1000, 28.0, 1), - SimTraceWorkload.Fragment(duration * 1000, duration * 1000, 3500.0, 1), - SimTraceWorkload.Fragment(duration * 2000, duration * 1000, 0.0, 1), - SimTraceWorkload.Fragment(duration * 3000, duration * 1000, 183.0, 1) + SimTrace.ofFragments( + SimTraceFragment(0, duration * 1000, 28.0, 1), + SimTraceFragment(duration * 1000, duration * 1000, 3500.0, 1), + SimTraceFragment(duration * 2000, duration * 1000, 0.0, 1), + SimTraceFragment(duration * 3000, duration * 1000, 183.0, 1) ), ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt index cdbffe4b..574860e8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkloadTest.kt @@ -58,11 +58,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 2), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 0 ) @@ -85,11 +85,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 2), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 1000 ) @@ -112,11 +112,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 2), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 2), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 0 ) @@ -140,11 +140,11 @@ class SimTraceWorkloadTest { ) val workload = SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(0, 1000, 2 * 28.0, 2), - SimTraceWorkload.Fragment(1000, 1000, 2 * 3100.0, 2), - SimTraceWorkload.Fragment(2000, 1000, 0.0, 0), - SimTraceWorkload.Fragment(3000, 1000, 2 * 73.0, 2) + SimTrace.ofFragments( + SimTraceFragment(0, 1000, 2 * 28.0, 2), + SimTraceFragment(1000, 1000, 2 * 3100.0, 2), + SimTraceFragment(2000, 1000, 0.0, 0), + SimTraceFragment(3000, 1000, 2 * 73.0, 2) ), offset = 0 ) |
