diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-10-25 13:32:41 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-25 13:32:41 +0200 |
| commit | 5a365dbc068f2a8cdfa9813c39cc84bb30e15637 (patch) | |
| tree | 72716d562787b85e03cdc7fe1d30c827054d25a0 /opendc-compute/opendc-compute-workload/src/main | |
| parent | 27f5b7dcb05aefdab9b762175d538931face0aba (diff) | |
Rewrote the FlowEngine (#256)
* Removed unused components. Updated tests.
Improved checkpointing model
Improved model, started with SimPowerSource
implemented FailureModels and Checkpointing
First working version
midway commit
first update
All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability.
* fixed merge conflicts
* Updated M3SA paths.
* Fixed small typo
Diffstat (limited to 'opendc-compute/opendc-compute-workload/src/main')
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt | 4 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt | 70 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt) | 13 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt | 6 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt | 14 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt | 6 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt | 4 |
7 files changed, 41 insertions, 76 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt index c9f784ff..9516c56e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt @@ -29,10 +29,10 @@ import java.util.random.RandomGenerator */ public interface ComputeWorkload { /** - * Resolve the workload into a list of [VirtualMachine]s to simulate. + * Resolve the workload into a list of [Task]s to simulate. */ public fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List<VirtualMachine> + ): List<Task> } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 99863af8..f22bc1d1 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,13 +23,8 @@ package org.opendc.compute.workload import mu.KotlinLogging -import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.TraceWorkload import org.opendc.trace.Trace -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES import org.opendc.trace.conv.resourceCpuCapacity @@ -52,7 +47,12 @@ import kotlin.math.roundToLong * * @param baseDir The directory containing the traces. */ -public class ComputeWorkloadLoader(private val baseDir: File) { +public class ComputeWorkloadLoader( + private val baseDir: File, + private val checkpointInterval: Long, + private val checkpointDuration: Long, + private val checkpointIntervalScaling: Double, +) { /** * The logger for this instance. */ @@ -61,7 +61,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The cache of workloads. */ - private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>() + private val cache = ConcurrentHashMap<String, SoftReference<List<Task>>>() /** * Read the fragments into memory. @@ -83,7 +83,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val builder = fragments.computeIfAbsent(id) { Builder() } + val builder = fragments.computeIfAbsent(id) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) } builder.add(durationMs, cpuUsage, cores) } @@ -99,8 +99,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { private fun parseMeta( trace: Trace, fragments: Map<String, Builder>, - interferenceModel: VmInterferenceModel, - ): List<VirtualMachine> { + ): List<Task> { val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() val idCol = reader.resolve(resourceID) @@ -111,7 +110,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val memCol = reader.resolve(resourceMemCapacity) var counter = 0 - val entries = mutableListOf<VirtualMachine>() + val entries = mutableListOf<Task>() return try { while (reader.nextRow()) { @@ -131,7 +130,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val totalLoad = builder.totalLoad entries.add( - VirtualMachine( + Task( uid, id, cpuCount, @@ -141,13 +140,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { submissionTime, duration, builder.build(), - interferenceModel.getProfile(id), ), ) } // Make sure the virtual machines are ordered by start time - entries.sortBy { it.startTime } + entries.sortBy { it.submissionTime } entries } catch (e: Exception) { @@ -159,40 +157,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { } /** - * Read the interference model associated with the specified [trace]. - */ - private fun parseInterferenceModel(trace: Trace): VmInterferenceModel { - val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader() - - return try { - val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS) - val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET) - val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE) - - val modelBuilder = VmInterferenceModel.builder() - - while (reader.nextRow()) { - val members = reader.getSet(membersCol, String::class.java)!! - val target = reader.getDouble(targetCol) - val score = reader.getDouble(scoreCol) - - modelBuilder - .addGroup(members, target, score) - } - - modelBuilder.build() - } finally { - reader.close() - } - } - - /** * Load the trace with the specified [name] and [format]. */ public fun get( name: String, format: String, - ): List<VirtualMachine> { + ): List<Task> { val ref = cache.compute(name) { key, oldVal -> val inst = oldVal?.get() @@ -203,8 +173,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val trace = Trace.open(path, format) val fragments = parseFragments(trace) - val interferenceModel = parseInterferenceModel(trace) - val vms = parseMeta(trace, fragments, interferenceModel) + val vms = parseMeta(trace, fragments) SoftReference(vms) } else { @@ -225,7 +194,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * A builder for a VM trace. */ - private class Builder { + private class Builder(checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double) { /** * The total load of the trace. */ @@ -234,13 +203,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * The internal builder for the trace. */ - private val builder = SimTrace.builder() + private val builder = TraceWorkload.builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) /** * Add a fragment to the trace. * - * @param timestamp Timestamp at which the fragment starts (in epoch millis). - * @param deadline Timestamp at which the fragment ends (in epoch millis). + * @param duration The duration of the fragment (in epoch millis). * @param usage CPU usage of this fragment. * @param cores Number of cores used. */ @@ -257,6 +225,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) { /** * Build the trace. */ - fun build(): SimTrace = builder.build() + fun build(): TraceWorkload = builder.build() } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index 66d51127..d121b381 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -22,8 +22,7 @@ package org.opendc.compute.workload -import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile -import org.opendc.simulator.compute.workload.SimTrace +import org.opendc.simulator.compute.workload.TraceWorkload import java.time.Instant import java.util.UUID @@ -35,20 +34,18 @@ import java.util.UUID * @param cpuCapacity The required CPU capacity for the VM in MHz. * @param cpuCount The number of vCPUs in the VM. * @param memCapacity The provisioned memory for the VM in MB. - * @param startTime The start time of the VM. - * @param stopTime The stop time of the VM. + * @param submissionTime The start time of the VM. * @param trace The trace that belong to this VM. * @param interferenceProfile The interference profile of this virtual machine. */ -public data class VirtualMachine( +public data class Task( val uid: UUID, val name: String, val cpuCount: Int, val cpuCapacity: Double, val memCapacity: Long, val totalLoad: Double, - val startTime: Instant, + val submissionTime: Instant, val duration: Long, - val trace: SimTrace, - val interferenceProfile: VmInterferenceProfile?, + val trace: TraceWorkload, ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt index aba493b6..998dbb34 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.random.RandomGenerator /** @@ -40,12 +40,12 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double override fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List<VirtualMachine> { + ): List<Task> { val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) } val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } } - val res = mutableListOf<VirtualMachine>() + val res = mutableListOf<Task>() for ((fraction, vms) in traces) { var currentLoad = 0.0 diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt index 4207b2be..d3bdde31 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.UUID import java.util.random.RandomGenerator @@ -53,7 +53,7 @@ internal class HpcSampledComputeWorkload( override fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List<VirtualMachine> { + ): List<Task> { val vms = source.resolve(loader, random) val (hpc, nonHpc) = @@ -65,7 +65,7 @@ internal class HpcSampledComputeWorkload( val hpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<VirtualMachine>() + val res = mutableListOf<Task>() hpc.mapTo(res) { sample(it, index) } res } @@ -74,7 +74,7 @@ internal class HpcSampledComputeWorkload( val nonHpcSequence = generateSequence(0) { it + 1 } .map { index -> - val res = mutableListOf<VirtualMachine>() + val res = mutableListOf<Task>() nonHpc.mapTo(res) { sample(it, index) } res } @@ -90,7 +90,7 @@ internal class HpcSampledComputeWorkload( var nonHpcCount = 0 var nonHpcLoad = 0.0 - val res = mutableListOf<VirtualMachine>() + val res = mutableListOf<Task>() if (sampleLoad) { var currentLoad = 0.0 @@ -146,9 +146,9 @@ internal class HpcSampledComputeWorkload( * Sample a random trace entry. */ private fun sample( - entry: VirtualMachine, + entry: Task, i: Int, - ): VirtualMachine { + ): Task { val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray()) return entry.copy(uid = uid) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt index 51ddb27c..534ac6a0 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.random.RandomGenerator /** @@ -40,9 +40,9 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract override fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List<VirtualMachine> { + ): List<Task> { val vms = source.resolve(loader, random) // fixme: Should be shuffled, otherwise the first fraction is always chosen - val res = mutableListOf<VirtualMachine>() + val res = mutableListOf<Task>() val totalLoad = vms.sumOf { it.totalLoad } var currentLoad = 0.0 diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt index 39255c59..d796341b 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt @@ -24,7 +24,7 @@ package org.opendc.compute.workload.internal import org.opendc.compute.workload.ComputeWorkload import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.VirtualMachine +import org.opendc.compute.workload.Task import java.util.random.RandomGenerator /** @@ -34,7 +34,7 @@ internal class TraceComputeWorkload(val name: String, val format: String) : Comp override fun resolve( loader: ComputeWorkloadLoader, random: RandomGenerator, - ): List<VirtualMachine> { + ): List<Task> { return loader.get(name, format) } } |
