From 5a365dbc068f2a8cdfa9813c39cc84bb30e15637 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 25 Oct 2024 13:32:41 +0200 Subject: Rewrote the FlowEngine (#256) * Removed unused components. Updated tests. Improved checkpointing model Improved model, started with SimPowerSource implemented FailureModels and Checkpointing First working version midway commit first update All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. * fixed merge conflicts * Updated M3SA paths. * Fixed small typo --- .../org/opendc/compute/workload/ComputeWorkload.kt | 4 +- .../compute/workload/ComputeWorkloadLoader.kt | 70 ++++++---------------- .../kotlin/org/opendc/compute/workload/Task.kt | 51 ++++++++++++++++ .../org/opendc/compute/workload/VirtualMachine.kt | 54 ----------------- .../workload/internal/CompositeComputeWorkload.kt | 6 +- .../workload/internal/HpcSampledComputeWorkload.kt | 14 ++--- .../internal/LoadSampledComputeWorkload.kt | 6 +- .../workload/internal/TraceComputeWorkload.kt | 4 +- 8 files changed, 87 insertions(+), 122 deletions(-) create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt (limited to 'opendc-compute/opendc-compute-workload/src/main') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index c9f784ff..9516c56e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -29,10 +29,10 @@ import java.util.random.RandomGenerator */ public interface ComputeWorkload { /** - * Resolve the workload into a list of [VirtualMachine]s to simulate. + * Resolve the workload into a list of [Task]s to simulate. */ public fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List + ): List } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 99863af8..f22bc1d1 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,13 +23,8 @@ package org.opendc.compute.workload import mu.KotlinLogging -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.TraceWorkload import org.opendc.trace.Trace -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES import org.opendc.trace.conv.resourceCpuCapacity @@ -52,7 +47,12 @@ import kotlin.math.roundToLong * * @param baseDir The directory containing the traces. */ -public class ComputeWorkloadLoader(private val baseDir: File) { +public class ComputeWorkloadLoader( + private val baseDir: File, + private val checkpointInterval: Long, + private val checkpointDuration: Long, + private val checkpointIntervalScaling: Double, +) { /** * The logger for this instance. */ @@ -61,7 +61,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The cache of workloads. */ - private val cache = ConcurrentHashMap>>() + private val cache = ConcurrentHashMap>>() /** * Read the fragments into memory. @@ -83,7 +83,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val builder = fragments.computeIfAbsent(id) { Builder() } + val builder = fragments.computeIfAbsent(id) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) } builder.add(durationMs, cpuUsage, cores) } @@ -99,8 +99,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private fun parseMeta( trace: Trace, fragments: Map, - interferenceModel: VmInterferenceModel, - ): List { + ): List { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() val idCol = reader.resolve(resourceID) @@ -111,7 +110,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val memCol = reader.resolve(resourceMemCapacity) var counter = 0 - val entries = mutableListOf() + val entries = mutableListOf() return try { while (reader.nextRow()) { @@ -131,7 +130,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val totalLoad = builder.totalLoad entries.add( - VirtualMachine( + Task( uid, id, cpuCount, @@ -141,13 +140,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { submissionTime, duration, builder.build(), - interferenceModel.getProfile(id), ), ) } // Make sure the virtual machines are ordered by start time - entries.sortBy { it.startTime } + entries.sortBy { it.submissionTime } entries } catch (e: Exception) { @@ -158,41 +156,13 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } } - /** - * Read the interference model associated with the specified [trace]. - */ - private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { - val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() - - return try { - val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) - val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) - val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) - - val modelBuilder = VmInterferenceModel.builder() - - while (reader.nextRow()) { - val members = reader.getSet(membersCol, String::class.java)!! - val target = reader.getDouble(targetCol) - val score = reader.getDouble(scoreCol) - - modelBuilder - .addGroup(members, target, score) - } - - modelBuilder.build() - } finally { - reader.close() - } - } - /** * Load the trace with the specified [name] and [format]. */ public fun get( name: String, format: String, - ): List { + ): List { val ref = cache.compute(name) { key, oldVal -> val inst = oldVal?.get() @@ -203,8 +173,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val trace = Trace.open(path, format) val fragments = parseFragments(trace) - val interferenceModel = parseInterferenceModel(trace) - val vms = parseMeta(trace, fragments, interferenceModel) + val vms = parseMeta(trace, fragments) SoftReference(vms) } else { @@ -225,7 +194,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * A builder for a VM trace. */ - private class Builder { + private class Builder(checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double) { /** * The total load of the trace. */ @@ -234,13 +203,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The internal builder for the trace. */ - private val builder = SimTrace.builder() + private val builder = TraceWorkload.builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) /** * Add a fragment to the trace. * - * @param timestamp Timestamp at which the fragment starts (in epoch millis). - * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param duration The duration of the fragment (in epoch millis). * @param usage CPU usage of this fragment. * @param cores Number of cores used. */ @@ -257,6 +225,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Build the trace. */ - fun build(): SimTrace = builder.build() + fun build(): TraceWorkload = builder.build() } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt new file mode 100644 index 00000000..d121b381 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload + +import org.opendc.simulator.compute.workload.TraceWorkload +import java.time.Instant +import java.util.UUID + +/** + * A virtual machine workload. + * + * @param uid The unique identifier of the virtual machine. + * @param name The name of the virtual machine. + * @param cpuCapacity The required CPU capacity for the VM in MHz. + * @param cpuCount The number of vCPUs in the VM. + * @param memCapacity The provisioned memory for the VM in MB. + * @param submissionTime The start time of the VM. + * @param trace The trace that belong to this VM. + * @param interferenceProfile The interference profile of this virtual machine. + */ +public data class Task( + val uid: UUID, + val name: String, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Long, + val totalLoad: Double, + val submissionTime: Instant, + val duration: Long, + val trace: TraceWorkload, +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt deleted file mode 100644 index 66d51127..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload - -import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile -import org.opendc.simulator.compute.workload.SimTrace -import java.time.Instant -import java.util.UUID - -/** - * A virtual machine workload. - * - * @param uid The unique identifier of the virtual machine. - * @param name The name of the virtual machine. - * @param cpuCapacity The required CPU capacity for the VM in MHz. - * @param cpuCount The number of vCPUs in the VM. - * @param memCapacity The provisioned memory for the VM in MB. - * @param startTime The start time of the VM. - * @param stopTime The stop time of the VM. - * @param trace The trace that belong to this VM. - * @param interferenceProfile The interference profile of this virtual machine. - */ -public data class VirtualMachine( - val uid: UUID, - val name: String, - val cpuCount: Int, - val cpuCapacity: Double, - val memCapacity: Long, - val totalLoad: Double, - val startTime: Instant, - val duration: Long, - val trace: SimTrace, - val interferenceProfile: VmInterferenceProfile?, -) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index aba493b6..998dbb34 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.random.RandomGenerator /** @@ -40,12 +40,12 @@ internal class CompositeComputeWorkload(val sources: Map { + ): List { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } - val res = mutableListOf() + val res = mutableListOf() for ((fraction, vms) in traces) { var currentLoad = 0.0 diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index 4207b2be..d3bdde31 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.UUID import java.util.random.RandomGenerator @@ -53,7 +53,7 @@ internal class HpcSampledComputeWorkload( override fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List { + ): List { val vms = source.resolve(loader, random) val (hpc, nonHpc) = @@ -65,7 +65,7 @@ internal class HpcSampledComputeWorkload( val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf() + val res = mutableListOf() hpc.mapTo(res) { sample(it, index) } res } @@ -74,7 +74,7 @@ internal class HpcSampledComputeWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf() + val res = mutableListOf() nonHpc.mapTo(res) { sample(it, index) } res } @@ -90,7 +90,7 @@ internal class HpcSampledComputeWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf() + val res = mutableListOf() if (sampleLoad) { var currentLoad = 0.0 @@ -146,9 +146,9 @@ internal class HpcSampledComputeWorkload( * Sample a random trace entry. */ private fun sample( - entry: VirtualMachine, + entry: Task, i: Int, - ): VirtualMachine { + ): Task { val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) return entry.copy(uid = uid) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index 51ddb27c..534ac6a0 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.random.RandomGenerator /** @@ -40,9 +40,9 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract override fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List { + ): List { val vms = source.resolve(loader, random) // fixme: Should be shuffled, otherwise the first fraction is always chosen - val res = mutableListOf() + val res = mutableListOf() val totalLoad = vms.sumOf { it.totalLoad } var currentLoad = 0.0 diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index 39255c59..d796341b 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -24,7 +24,7 @@ package org.opendc.compute.workload.internal import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.random.RandomGenerator /** @@ -34,7 +34,7 @@ internal class TraceComputeWorkload(val name: String, val format: String) : Comp override fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List { + ): List { return loader.get(name, format) } } -- cgit v1.2.3