summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-10-25 13:32:41 +0200
committerGitHub <noreply@github.com>2024-10-25 13:32:41 +0200
commit5a365dbc068f2a8cdfa9813c39cc84bb30e15637 (patch)
tree72716d562787b85e03cdc7fe1d30c827054d25a0 /opendc-compute/opendc-compute-workload/src/main
parent27f5b7dcb05aefdab9b762175d538931face0aba (diff)
Rewrote the FlowEngine (#256)
* Removed unused components. Updated tests. Improved checkpointing model Improved model, started with SimPowerSource implemented FailureModels and Checkpointing First working version midway commit first update All simulation are now run with a single CPU and single MemoryUnit. multi CPUs are combined into one. This is for performance and explainability. * fixed merge conflicts * Updated M3SA paths. * Fixed small typo
Diffstat (limited to 'opendc-compute/opendc-compute-workload/src/main')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt4
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt70
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt)13
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt14
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt4
7 files changed, 41 insertions, 76 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
index c9f784ff..9516c56e 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkload.kt
@@ -29,10 +29,10 @@ import java.util.random.RandomGenerator
*/
public interface ComputeWorkload {
/**
- * Resolve the workload into a list of [VirtualMachine]s to simulate.
+ * Resolve the workload into a list of [Task]s to simulate.
*/
public fun resolve(
loader: ComputeWorkloadLoader,
random: RandomGenerator,
- ): List<VirtualMachine>
+ ): List<Task>
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 99863af8..f22bc1d1 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -23,13 +23,8 @@
package org.opendc.compute.workload
import mu.KotlinLogging
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
-import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.TraceWorkload
import org.opendc.trace.Trace
-import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
-import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
import org.opendc.trace.conv.resourceCpuCapacity
@@ -52,7 +47,12 @@ import kotlin.math.roundToLong
*
* @param baseDir The directory containing the traces.
*/
-public class ComputeWorkloadLoader(private val baseDir: File) {
+public class ComputeWorkloadLoader(
+ private val baseDir: File,
+ private val checkpointInterval: Long,
+ private val checkpointDuration: Long,
+ private val checkpointIntervalScaling: Double,
+) {
/**
* The logger for this instance.
*/
@@ -61,7 +61,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* The cache of workloads.
*/
- private val cache = ConcurrentHashMap<String, SoftReference<List<VirtualMachine>>>()
+ private val cache = ConcurrentHashMap<String, SoftReference<List<Task>>>()
/**
* Read the fragments into memory.
@@ -83,7 +83,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
- val builder = fragments.computeIfAbsent(id) { Builder() }
+ val builder = fragments.computeIfAbsent(id) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) }
builder.add(durationMs, cpuUsage, cores)
}
@@ -99,8 +99,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
private fun parseMeta(
trace: Trace,
fragments: Map<String, Builder>,
- interferenceModel: VmInterferenceModel,
- ): List<VirtualMachine> {
+ ): List<Task> {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
val idCol = reader.resolve(resourceID)
@@ -111,7 +110,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val memCol = reader.resolve(resourceMemCapacity)
var counter = 0
- val entries = mutableListOf<VirtualMachine>()
+ val entries = mutableListOf<Task>()
return try {
while (reader.nextRow()) {
@@ -131,7 +130,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val totalLoad = builder.totalLoad
entries.add(
- VirtualMachine(
+ Task(
uid,
id,
cpuCount,
@@ -141,13 +140,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
submissionTime,
duration,
builder.build(),
- interferenceModel.getProfile(id),
),
)
}
// Make sure the virtual machines are ordered by start time
- entries.sortBy { it.startTime }
+ entries.sortBy { it.submissionTime }
entries
} catch (e: Exception) {
@@ -159,40 +157,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
}
/**
- * Read the interference model associated with the specified [trace].
- */
- private fun parseInterferenceModel(trace: Trace): VmInterferenceModel {
- val reader = checkNotNull(trace.getTable(TABLE_INTERFERENCE_GROUPS)).newReader()
-
- return try {
- val membersCol = reader.resolve(INTERFERENCE_GROUP_MEMBERS)
- val targetCol = reader.resolve(INTERFERENCE_GROUP_TARGET)
- val scoreCol = reader.resolve(INTERFERENCE_GROUP_SCORE)
-
- val modelBuilder = VmInterferenceModel.builder()
-
- while (reader.nextRow()) {
- val members = reader.getSet(membersCol, String::class.java)!!
- val target = reader.getDouble(targetCol)
- val score = reader.getDouble(scoreCol)
-
- modelBuilder
- .addGroup(members, target, score)
- }
-
- modelBuilder.build()
- } finally {
- reader.close()
- }
- }
-
- /**
* Load the trace with the specified [name] and [format].
*/
public fun get(
name: String,
format: String,
- ): List<VirtualMachine> {
+ ): List<Task> {
val ref =
cache.compute(name) { key, oldVal ->
val inst = oldVal?.get()
@@ -203,8 +173,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val trace = Trace.open(path, format)
val fragments = parseFragments(trace)
- val interferenceModel = parseInterferenceModel(trace)
- val vms = parseMeta(trace, fragments, interferenceModel)
+ val vms = parseMeta(trace, fragments)
SoftReference(vms)
} else {
@@ -225,7 +194,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* A builder for a VM trace.
*/
- private class Builder {
+ private class Builder(checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double) {
/**
* The total load of the trace.
*/
@@ -234,13 +203,12 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* The internal builder for the trace.
*/
- private val builder = SimTrace.builder()
+ private val builder = TraceWorkload.builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling)
/**
* Add a fragment to the trace.
*
- * @param timestamp Timestamp at which the fragment starts (in epoch millis).
- * @param deadline Timestamp at which the fragment ends (in epoch millis).
+ * @param duration The duration of the fragment (in epoch millis).
* @param usage CPU usage of this fragment.
* @param cores Number of cores used.
*/
@@ -257,6 +225,6 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
/**
* Build the trace.
*/
- fun build(): SimTrace = builder.build()
+ fun build(): TraceWorkload = builder.build()
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
index 66d51127..d121b381 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/VirtualMachine.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt
@@ -22,8 +22,7 @@
package org.opendc.compute.workload
-import org.opendc.simulator.compute.kernel.interference.VmInterferenceProfile
-import org.opendc.simulator.compute.workload.SimTrace
+import org.opendc.simulator.compute.workload.TraceWorkload
import java.time.Instant
import java.util.UUID
@@ -35,20 +34,18 @@ import java.util.UUID
* @param cpuCapacity The required CPU capacity for the VM in MHz.
* @param cpuCount The number of vCPUs in the VM.
* @param memCapacity The provisioned memory for the VM in MB.
- * @param startTime The start time of the VM.
- * @param stopTime The stop time of the VM.
+ * @param submissionTime The start time of the VM.
* @param trace The trace that belong to this VM.
* @param interferenceProfile The interference profile of this virtual machine.
*/
-public data class VirtualMachine(
+public data class Task(
val uid: UUID,
val name: String,
val cpuCount: Int,
val cpuCapacity: Double,
val memCapacity: Long,
val totalLoad: Double,
- val startTime: Instant,
+ val submissionTime: Instant,
val duration: Long,
- val trace: SimTrace,
- val interferenceProfile: VmInterferenceProfile?,
+ val trace: TraceWorkload,
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
index aba493b6..998dbb34 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/CompositeComputeWorkload.kt
@@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal
import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
+import org.opendc.compute.workload.Task
import java.util.random.RandomGenerator
/**
@@ -40,12 +40,12 @@ internal class CompositeComputeWorkload(val sources: Map<ComputeWorkload, Double
override fun resolve(
loader: ComputeWorkloadLoader,
random: RandomGenerator,
- ): List<VirtualMachine> {
+ ): List<Task> {
val traces = sources.map { (source, fraction) -> fraction to source.resolve(loader, random) }
val totalLoad = traces.sumOf { (_, vms) -> vms.sumOf { it.totalLoad } }
- val res = mutableListOf<VirtualMachine>()
+ val res = mutableListOf<Task>()
for ((fraction, vms) in traces) {
var currentLoad = 0.0
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
index 4207b2be..d3bdde31 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/HpcSampledComputeWorkload.kt
@@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal
import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
+import org.opendc.compute.workload.Task
import java.util.UUID
import java.util.random.RandomGenerator
@@ -53,7 +53,7 @@ internal class HpcSampledComputeWorkload(
override fun resolve(
loader: ComputeWorkloadLoader,
random: RandomGenerator,
- ): List<VirtualMachine> {
+ ): List<Task> {
val vms = source.resolve(loader, random)
val (hpc, nonHpc) =
@@ -65,7 +65,7 @@ internal class HpcSampledComputeWorkload(
val hpcSequence =
generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<VirtualMachine>()
+ val res = mutableListOf<Task>()
hpc.mapTo(res) { sample(it, index) }
res
}
@@ -74,7 +74,7 @@ internal class HpcSampledComputeWorkload(
val nonHpcSequence =
generateSequence(0) { it + 1 }
.map { index ->
- val res = mutableListOf<VirtualMachine>()
+ val res = mutableListOf<Task>()
nonHpc.mapTo(res) { sample(it, index) }
res
}
@@ -90,7 +90,7 @@ internal class HpcSampledComputeWorkload(
var nonHpcCount = 0
var nonHpcLoad = 0.0
- val res = mutableListOf<VirtualMachine>()
+ val res = mutableListOf<Task>()
if (sampleLoad) {
var currentLoad = 0.0
@@ -146,9 +146,9 @@ internal class HpcSampledComputeWorkload(
* Sample a random trace entry.
*/
private fun sample(
- entry: VirtualMachine,
+ entry: Task,
i: Int,
- ): VirtualMachine {
+ ): Task {
val uid = UUID.nameUUIDFromBytes("${entry.uid}-$i".toByteArray())
return entry.copy(uid = uid)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
index 51ddb27c..534ac6a0 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/LoadSampledComputeWorkload.kt
@@ -25,7 +25,7 @@ package org.opendc.compute.workload.internal
import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
+import org.opendc.compute.workload.Task
import java.util.random.RandomGenerator
/**
@@ -40,9 +40,9 @@ internal class LoadSampledComputeWorkload(val source: ComputeWorkload, val fract
override fun resolve(
loader: ComputeWorkloadLoader,
random: RandomGenerator,
- ): List<VirtualMachine> {
+ ): List<Task> {
val vms = source.resolve(loader, random) // fixme: Should be shuffled, otherwise the first fraction is always chosen
- val res = mutableListOf<VirtualMachine>()
+ val res = mutableListOf<Task>()
val totalLoad = vms.sumOf { it.totalLoad }
var currentLoad = 0.0
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
index 39255c59..d796341b 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/internal/TraceComputeWorkload.kt
@@ -24,7 +24,7 @@ package org.opendc.compute.workload.internal
import org.opendc.compute.workload.ComputeWorkload
import org.opendc.compute.workload.ComputeWorkloadLoader
-import org.opendc.compute.workload.VirtualMachine
+import org.opendc.compute.workload.Task
import java.util.random.RandomGenerator
/**
@@ -34,7 +34,7 @@ internal class TraceComputeWorkload(val name: String, val format: String) : Comp
override fun resolve(
loader: ComputeWorkloadLoader,
random: RandomGenerator,
- ): List<VirtualMachine> {
+ ): List<Task> {
return loader.get(name, format)
}
}