From bb945c2fdd7b20898e3dfccbac7da2a427418216 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Thu, 23 Jan 2025 12:46:33 +0100 Subject: Added sampleFraction and submissionTime to the workloadSpec (#295) * Added sampleFraction and submissionTime to the workloadSpec * Removed commented code --- .../compute/workload/ComputeWorkloadLoader.kt | 31 +++++----------- .../kotlin/org/opendc/compute/workload/Task.kt | 2 +- .../org/opendc/compute/workload/WorkloadLoader.kt | 42 +++++++++++++++++++--- 3 files changed, 47 insertions(+), 28 deletions(-) (limited to 'opendc-compute/opendc-compute-workload/src') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index e773ba1d..655bacb9 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -49,10 +49,11 @@ import kotlin.math.roundToLong */ public class ComputeWorkloadLoader( private val pathToFile: File, - private val checkpointInterval: Long, - private val checkpointDuration: Long, - private val checkpointIntervalScaling: Double, -) : WorkloadLoader() { + private val subMissionTime: String? = null, + private val checkpointInterval: Long = 0L, + private val checkpointDuration: Long = 0L, + private val checkpointIntervalScaling: Double = 1.0, +) : WorkloadLoader(subMissionTime) { /** * The logger for this instance. */ @@ -160,25 +161,11 @@ public class ComputeWorkloadLoader( * Load the trace at the specified [pathToFile]. */ override fun load(): List { - val ref = - cache.compute(pathToFile) { key, oldVal -> - val inst = oldVal?.get() - if (inst == null) { -// val path = baseDir.resolve(key) + val trace = Trace.open(pathToFile, "opendc-vm") + val fragments = parseFragments(trace) + val vms = parseMeta(trace, fragments) - logger.info { "Loading trace $key at $pathToFile" } - - val trace = Trace.open(pathToFile, "opendc-vm") - val fragments = parseFragments(trace) - val vms = parseMeta(trace, fragments) - - SoftReference(vms) - } else { - oldVal - } - } - - return checkNotNull(ref?.get()) { "Memory pressure" } + return vms } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index d121b381..60be9299 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -45,7 +45,7 @@ public data class Task( val cpuCapacity: Double, val memCapacity: Long, val totalLoad: Double, - val submissionTime: Instant, + var submissionTime: Instant, val duration: Long, val trace: TraceWorkload, ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt index f4a93c24..0b551a3c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt @@ -22,24 +22,56 @@ package org.opendc.compute.workload import mu.KotlinLogging +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset -public abstract class WorkloadLoader { +public abstract class WorkloadLoader(private val submissionTime: String? = null) { private val logger = KotlinLogging.logger {} + public fun reScheduleTasks(workload: List) { + if (submissionTime == null) { + return + } + + val workloadSubmissionTime = workload.minOf({ it.submissionTime }).toEpochMilli() + val submissionTimeLong = LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli() + + val timeShift = submissionTimeLong - workloadSubmissionTime + + for (task in workload) { + task.submissionTime = Instant.ofEpochMilli(task.submissionTime.toEpochMilli() + timeShift) + } + } + public abstract fun load(): List /** * Load the workload at sample tasks until a fraction of the workload is loaded */ public fun sampleByLoad(fraction: Double): List { - val vms = this.load() + val workload = this.load() + + reScheduleTasks(workload) + + if (fraction >= 1.0) { + return workload + } + + if (fraction <= 0.0) { + throw Error("The fraction of tasks to load cannot be 0.0 or lower") + } + val res = mutableListOf() - val totalLoad = vms.sumOf { it.totalLoad } + val totalLoad = workload.sumOf { it.totalLoad } var currentLoad = 0.0 - for (entry in vms) { + val shuffledWorkload = workload.shuffled() + for (entry in shuffledWorkload) { val entryLoad = entry.totalLoad + + // TODO: ask Sacheen if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -48,7 +80,7 @@ public abstract class WorkloadLoader { res += entry } - logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } return res } -- cgit v1.2.3