From e22c97dcca7478d6941b78bdf7cd873bc0d23cdc Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 22 Jul 2025 15:47:44 +0200 Subject: Updated workload schema (#360) --- .../compute/workload/ComputeWorkloadLoader.kt | 119 ++++++++++----------- .../kotlin/org/opendc/compute/workload/Task.kt | 10 +- 2 files changed, 62 insertions(+), 67 deletions(-) (limited to 'opendc-compute/opendc-compute-workload') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 3a0ee3e0..5db2b43b 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -27,27 +27,27 @@ import org.opendc.simulator.compute.workload.trace.TraceWorkload import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy import org.opendc.trace.Trace -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceChildren -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceDeadline -import org.opendc.trace.conv.resourceDuration -import org.opendc.trace.conv.resourceGpuCapacity -import org.opendc.trace.conv.resourceGpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceNature -import org.opendc.trace.conv.resourceParents -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateGpuUsage -import org.opendc.trace.conv.resourceSubmissionTime +import org.opendc.trace.conv.FRAGMENT_CPU_USAGE +import org.opendc.trace.conv.FRAGMENT_DURATION +import org.opendc.trace.conv.FRAGMENT_GPU_USAGE +import org.opendc.trace.conv.TABLE_FRAGMENTS +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_CPU_CAPACITY +import org.opendc.trace.conv.TASK_CPU_COUNT +import org.opendc.trace.conv.TASK_DEADLINE +import org.opendc.trace.conv.TASK_DURATION +import org.opendc.trace.conv.TASK_GPU_CAPACITY +import org.opendc.trace.conv.TASK_GPU_COUNT +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_MEM_CAPACITY +import org.opendc.trace.conv.TASK_NAME +import org.opendc.trace.conv.TASK_NATURE +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_SUBMISSION_TIME import java.io.File import java.lang.ref.SoftReference import java.time.Duration -import java.util.UUID import java.util.concurrent.ConcurrentHashMap import kotlin.math.roundToLong @@ -77,23 +77,20 @@ public class ComputeWorkloadLoader( /** * Read the fragments into memory. */ - private fun parseFragments(trace: Trace): Map { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + private fun parseFragments(trace: Trace): Map { + val reader = checkNotNull(trace.getTable(TABLE_FRAGMENTS)).newReader() - val idCol = reader.resolve(resourceID) - val durationCol = reader.resolve(resourceStateDuration) - val coresCol = reader.resolve(resourceCpuCount) - val usageCol = reader.resolve(resourceStateCpuUsage) - val gpuCoresCol = reader.resolve(resourceGpuCount) - val resourceGpuCapacityCol = reader.resolve(resourceStateGpuUsage) + val idCol = reader.resolve(TASK_ID) + val durationCol = reader.resolve(FRAGMENT_DURATION) + val usageCol = reader.resolve(FRAGMENT_CPU_USAGE) + val resourceGpuCapacityCol = reader.resolve(FRAGMENT_GPU_USAGE) - val fragments = mutableMapOf() + val fragments = mutableMapOf() return try { while (reader.nextRow()) { - val id = reader.getString(idCol)!! + val id = reader.getInt(idCol) val durationMs = reader.getDuration(durationCol)!! - val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) val gpuUsage = if (reader.getDouble( @@ -104,14 +101,13 @@ public class ComputeWorkloadLoader( } else { reader.getDouble(resourceGpuCapacityCol) // Default to 0 if not present } - val gpuCores = reader.getInt(gpuCoresCol) // Default to 0 if not present - val gpuMemory = 0L // Default to 0 if not present + val gpuMemory = 0 // Default to 0 if not present val builder = fragments.computeIfAbsent( id, ) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy, id) } - builder.add(durationMs, cpuUsage, cores, gpuUsage, gpuCores, gpuMemory) + builder.add(durationMs, cpuUsage, gpuUsage, gpuMemory) } fragments @@ -125,29 +121,35 @@ public class ComputeWorkloadLoader( */ private fun parseMeta( trace: Trace, - fragments: Map, + fragments: Map, ): List { - val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() + val reader = checkNotNull(trace.getTable(TABLE_TASKS)).newReader() - val idCol = reader.resolve(resourceID) - val submissionTimeCol = reader.resolve(resourceSubmissionTime) - val durationCol = reader.resolve(resourceDuration) - val cpuCountCol = reader.resolve(resourceCpuCount) - val cpuCapacityCol = reader.resolve(resourceCpuCapacity) - val memCol = reader.resolve(resourceMemCapacity) - val gpuCapacityCol = reader.resolve(resourceGpuCapacity) // Assuming GPU capacity is also present - val gpuCoreCountCol = reader.resolve(resourceGpuCount) // Assuming GPU cores are also present - val parentsCol = reader.resolve(resourceParents) - val childrenCol = reader.resolve(resourceChildren) - val natureCol = reader.resolve(resourceNature) - val deadlineCol = reader.resolve(resourceDeadline) + val idCol = reader.resolve(TASK_ID) + val idName = reader.resolve(TASK_NAME) + val submissionTimeCol = reader.resolve(TASK_SUBMISSION_TIME) + val durationCol = reader.resolve(TASK_DURATION) + val cpuCountCol = reader.resolve(TASK_CPU_COUNT) + val cpuCapacityCol = reader.resolve(TASK_CPU_CAPACITY) + val memCol = reader.resolve(TASK_MEM_CAPACITY) + val gpuCapacityCol = reader.resolve(TASK_GPU_CAPACITY) // Assuming GPU capacity is also present + val gpuCoreCountCol = reader.resolve(TASK_GPU_COUNT) // Assuming GPU cores are also present + val parentsCol = reader.resolve(TASK_PARENTS) + val childrenCol = reader.resolve(TASK_CHILDREN) + val natureCol = reader.resolve(TASK_NATURE) + val deadlineCol = reader.resolve(TASK_DEADLINE) - var counter = 0 val entries = mutableListOf() return try { while (reader.nextRow()) { - val id = reader.getString(idCol)!! + val id = reader.getInt(idCol) + var name = reader.getString(idName) + + if (name == null) { + name = id.toString() + } + if (!fragments.containsKey(id)) { continue } @@ -169,10 +171,9 @@ public class ComputeWorkloadLoader( val gpuCoreCount = reader.getInt(gpuCoreCountCol) // Default to 0 if not present val gpuMemory = 0L // currently not implemented - val parents = reader.getSet(parentsCol, String::class.java) // No dependencies in the trace - val children = reader.getSet(childrenCol, String::class.java) // No dependencies in the trace + val parents = reader.getSet(parentsCol, Int::class.java) // No dependencies in the trace + val children = reader.getSet(childrenCol, Int::class.java) // No dependencies in the trace - val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) var nature = reader.getString(natureCol) var deadline = reader.getLong(deadlineCol) if (deferAll) { @@ -185,8 +186,8 @@ public class ComputeWorkloadLoader( entries.add( Task( - uid, id, + name, submissionTime, duration, parents!!, @@ -221,7 +222,7 @@ public class ComputeWorkloadLoader( * Load the trace at the specified [pathToFile]. */ override fun load(): List { - val trace = Trace.open(pathToFile, "opendc-vm") + val trace = Trace.open(pathToFile, "workload") val fragments = parseFragments(trace) val vms = parseMeta(trace, fragments) @@ -243,7 +244,7 @@ public class ComputeWorkloadLoader( checkpointDuration: Long, checkpointIntervalScaling: Double, scalingPolicy: ScalingPolicy, - taskName: String, + taskId: Int, ) { /** * The total load of the trace. @@ -259,7 +260,7 @@ public class ComputeWorkloadLoader( checkpointDuration, checkpointIntervalScaling, scalingPolicy, - taskName, + taskId, ) /** @@ -267,22 +268,18 @@ public class ComputeWorkloadLoader( * * @param duration The duration of the fragment (in epoch millis). * @param cpuUsage CPU usage of this fragment. - * @param cpuCores Number of cores used. * @param gpuUsage GPU usage of this fragment. - * @param gpuCores Number of GPU cores used. * @param gpuMemoryUsage GPU memory usage of this fragment. */ fun add( duration: Duration, cpuUsage: Double, - cpuCores: Int, gpuUsage: Double = 0.0, - gpuCores: Int = 0, - gpuMemoryUsage: Long = 0, + gpuMemoryUsage: Int = 0, ) { totalLoad += ((cpuUsage * duration.toMillis()) + (gpuUsage * duration.toMillis())) / 1000 // avg MHz * duration = MFLOPs - builder.add(duration.toMillis(), cpuUsage, cpuCores, gpuUsage, gpuCores, gpuMemoryUsage) + builder.add(duration.toMillis(), cpuUsage, gpuUsage, gpuMemoryUsage) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index b1ba4545..c875b8a2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -23,27 +23,25 @@ package org.opendc.compute.workload import org.opendc.simulator.compute.workload.trace.TraceWorkload -import java.util.UUID /** * A virtual machine workload. * - * @param uid The unique identifier of the virtual machine. + * @param id The unique identifier of the virtual machine. * @param name The name of the virtual machine. * @param cpuCapacity The required CPU capacity for the VM in MHz. * @param cpuCount The number of vCPUs in the VM. * @param memCapacity The provisioned memory for the VM in MB. * @param submissionTime The start time of the VM. * @param trace The trace that belong to this VM. - * @param interferenceProfile The interference profile of this virtual machine. */ public data class Task( - val uid: UUID, + val id: Int, val name: String, var submissionTime: Long, val duration: Long, - val parents: Set = emptySet(), - val children: Set = emptySet(), + val parents: Set = emptySet(), + val children: Set = emptySet(), val cpuCount: Int, val cpuCapacity: Double, val totalCpuLoad: Double, -- cgit v1.2.3