diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-01-23 12:46:33 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-23 12:46:33 +0100 |
| commit | bb945c2fdd7b20898e3dfccbac7da2a427418216 (patch) | |
| tree | b08dd7489ad4e5f8f752f0d21f745b91dc7a3d1f /opendc-compute | |
| parent | 9403af12f5d015497894fab5a2dea93eb094ecaf (diff) | |
Added sampleFraction and submissionTime to the workloadSpec (#295)
* Added sampleFraction and submissionTime to the workloadSpec
* Removed commented code
Diffstat (limited to 'opendc-compute')
3 files changed, 47 insertions, 28 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index e773ba1d..655bacb9 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -49,10 +49,11 @@ import kotlin.math.roundToLong */ public class ComputeWorkloadLoader( private val pathToFile: File, - private val checkpointInterval: Long, - private val checkpointDuration: Long, - private val checkpointIntervalScaling: Double, -) : WorkloadLoader() { + private val subMissionTime: String? = null, + private val checkpointInterval: Long = 0L, + private val checkpointDuration: Long = 0L, + private val checkpointIntervalScaling: Double = 1.0, +) : WorkloadLoader(subMissionTime) { /** * The logger for this instance. */ @@ -160,25 +161,11 @@ public class ComputeWorkloadLoader( * Load the trace at the specified [pathToFile]. */ override fun load(): List<Task> { - val ref = - cache.compute(pathToFile) { key, oldVal -> - val inst = oldVal?.get() - if (inst == null) { -// val path = baseDir.resolve(key) + val trace = Trace.open(pathToFile, "opendc-vm") + val fragments = parseFragments(trace) + val vms = parseMeta(trace, fragments) - logger.info { "Loading trace $key at $pathToFile" } - - val trace = Trace.open(pathToFile, "opendc-vm") - val fragments = parseFragments(trace) - val vms = parseMeta(trace, fragments) - - SoftReference(vms) - } else { - oldVal - } - } - - return checkNotNull(ref?.get()) { "Memory pressure" } + return vms } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index d121b381..60be9299 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -45,7 +45,7 @@ public data class Task( val cpuCapacity: Double, val memCapacity: Long, val totalLoad: Double, - val submissionTime: Instant, + var submissionTime: Instant, val duration: Long, val trace: TraceWorkload, ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt index f4a93c24..0b551a3c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt @@ -22,24 +22,56 @@ package org.opendc.compute.workload import mu.KotlinLogging +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset -public abstract class WorkloadLoader { +public abstract class WorkloadLoader(private val submissionTime: String? = null) { private val logger = KotlinLogging.logger {} + public fun reScheduleTasks(workload: List<Task>) { + if (submissionTime == null) { + return + } + + val workloadSubmissionTime = workload.minOf({ it.submissionTime }).toEpochMilli() + val submissionTimeLong = LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli() + + val timeShift = submissionTimeLong - workloadSubmissionTime + + for (task in workload) { + task.submissionTime = Instant.ofEpochMilli(task.submissionTime.toEpochMilli() + timeShift) + } + } + public abstract fun load(): List<Task> /** * Load the workload at sample tasks until a fraction of the workload is loaded */ public fun sampleByLoad(fraction: Double): List<Task> { - val vms = this.load() + val workload = this.load() + + reScheduleTasks(workload) + + if (fraction >= 1.0) { + return workload + } + + if (fraction <= 0.0) { + throw Error("The fraction of tasks to load cannot be 0.0 or lower") + } + val res = mutableListOf<Task>() - val totalLoad = vms.sumOf { it.totalLoad } + val totalLoad = workload.sumOf { it.totalLoad } var currentLoad = 0.0 - for (entry in vms) { + val shuffledWorkload = workload.shuffled() + for (entry in shuffledWorkload) { val entryLoad = entry.totalLoad + + // TODO: ask Sacheen if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -48,7 +80,7 @@ public abstract class WorkloadLoader { res += entry } - logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } return res } |
