diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-01-23 12:46:33 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-23 12:46:33 +0100 |
| commit | bb945c2fdd7b20898e3dfccbac7da2a427418216 (patch) | |
| tree | b08dd7489ad4e5f8f752f0d21f745b91dc7a3d1f | |
| parent | 9403af12f5d015497894fab5a2dea93eb094ecaf (diff) | |
Added sampleFraction and submissionTime to the workloadSpec (#295)
* Added sampleFraction and submissionTime to the workloadSpec
* Removed commented code
7 files changed, 57 insertions, 42 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index e773ba1d..655bacb9 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -49,10 +49,11 @@ import kotlin.math.roundToLong */ public class ComputeWorkloadLoader( private val pathToFile: File, - private val checkpointInterval: Long, - private val checkpointDuration: Long, - private val checkpointIntervalScaling: Double, -) : WorkloadLoader() { + private val subMissionTime: String? = null, + private val checkpointInterval: Long = 0L, + private val checkpointDuration: Long = 0L, + private val checkpointIntervalScaling: Double = 1.0, +) : WorkloadLoader(subMissionTime) { /** * The logger for this instance. */ @@ -160,25 +161,11 @@ public class ComputeWorkloadLoader( * Load the trace at the specified [pathToFile]. */ override fun load(): List<Task> { - val ref = - cache.compute(pathToFile) { key, oldVal -> - val inst = oldVal?.get() - if (inst == null) { -// val path = baseDir.resolve(key) + val trace = Trace.open(pathToFile, "opendc-vm") + val fragments = parseFragments(trace) + val vms = parseMeta(trace, fragments) - logger.info { "Loading trace $key at $pathToFile" } - - val trace = Trace.open(pathToFile, "opendc-vm") - val fragments = parseFragments(trace) - val vms = parseMeta(trace, fragments) - - SoftReference(vms) - } else { - oldVal - } - } - - return checkNotNull(ref?.get()) { "Memory pressure" } + return vms } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index d121b381..60be9299 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -45,7 +45,7 @@ public data class Task( val cpuCapacity: Double, val memCapacity: Long, val totalLoad: Double, - val submissionTime: Instant, + var submissionTime: Instant, val duration: Long, val trace: TraceWorkload, ) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt index f4a93c24..0b551a3c 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/WorkloadLoader.kt @@ -22,24 +22,56 @@ package org.opendc.compute.workload import mu.KotlinLogging +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset -public abstract class WorkloadLoader { +public abstract class WorkloadLoader(private val submissionTime: String? = null) { private val logger = KotlinLogging.logger {} + public fun reScheduleTasks(workload: List<Task>) { + if (submissionTime == null) { + return + } + + val workloadSubmissionTime = workload.minOf({ it.submissionTime }).toEpochMilli() + val submissionTimeLong = LocalDateTime.parse(submissionTime).toInstant(ZoneOffset.UTC).toEpochMilli() + + val timeShift = submissionTimeLong - workloadSubmissionTime + + for (task in workload) { + task.submissionTime = Instant.ofEpochMilli(task.submissionTime.toEpochMilli() + timeShift) + } + } + public abstract fun load(): List<Task> /** * Load the workload at sample tasks until a fraction of the workload is loaded */ public fun sampleByLoad(fraction: Double): List<Task> { - val vms = this.load() + val workload = this.load() + + reScheduleTasks(workload) + + if (fraction >= 1.0) { + return workload + } + + if (fraction <= 0.0) { + throw Error("The fraction of tasks to load cannot be 0.0 or lower") + } + val res = mutableListOf<Task>() - val totalLoad = vms.sumOf { it.totalLoad } + val totalLoad = workload.sumOf { it.totalLoad } var currentLoad = 0.0 - for (entry in vms) { + val shuffledWorkload = workload.shuffled() + for (entry in shuffledWorkload) { val entryLoad = entry.totalLoad + + // TODO: ask Sacheen if ((currentLoad + entryLoad) / totalLoad > fraction) { break } @@ -48,7 +80,7 @@ public abstract class WorkloadLoader { res += entry } - logger.info { "Sampled ${vms.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } + logger.info { "Sampled ${workload.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } return res } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt index a3414054..87c7abe9 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt @@ -32,15 +32,20 @@ import java.io.File * * @property pathToFile * @property type + * @property sampleFraction + * @property submissionTime */ @Serializable public data class WorkloadSpec( val pathToFile: String, val type: WorkloadTypes, + val sampleFraction: Double = 1.0, + val submissionTime: String? = null, ) { public val name: String = File(pathToFile).nameWithoutExtension init { + require(sampleFraction > 0) { "The fraction of the tasks can not be 0.0 or lower" } require(File(pathToFile).exists()) { "The provided path to the workload: $pathToFile does not exist " } } } @@ -65,6 +70,7 @@ public enum class WorkloadTypes { public fun getWorkloadLoader( type: WorkloadTypes, pathToFile: File, + submissionTime: String?, checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double, @@ -73,6 +79,7 @@ public fun getWorkloadLoader( WorkloadTypes.ComputeWorkload -> ComputeWorkloadLoader( pathToFile, + submissionTime, checkpointInterval, checkpointDuration, checkpointIntervalScaling, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt index a0263e38..45fedd0d 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioReplayer.kt @@ -139,9 +139,6 @@ public suspend fun ComputeService.replay( // Wait until the task is terminated taskWatcher.wait() - - // Stop the task after reaching the end-time of the virtual machine -// task.delete() } } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index cda43eb7..c9c2729d 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -80,24 +80,16 @@ public fun runScenario( val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 -// val workloadLoader = -// ComputeWorkloadLoader( -// File(scenario.workloadSpec.pathToFile), -// checkpointInterval, -// checkpointDuration, -// checkpointIntervalScaling, -// ) -// val tasks = getWorkloadType(scenario.workloadSpec.type).resolve(workloadLoader, Random(seed)) - val workloadLoader = getWorkloadLoader( scenario.workloadSpec.type, File(scenario.workloadSpec.pathToFile), + scenario.workloadSpec.submissionTime, checkpointInterval, checkpointDuration, checkpointIntervalScaling, ) - val workload = workloadLoader.load() + val workload = workloadLoader.sampleByLoad(scenario.workloadSpec.sampleFraction) val startTimeLong = workload.minOf { it.submissionTime }.toEpochMilli() val startTime = Duration.ofMillis(startTimeLong) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 8684132c..196d6a93 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -85,7 +85,7 @@ public class OpenDCRunner( /** * Helper class to load the workloads. */ - private val workloadLoader = ComputeWorkloadLoader(tracePath, 0L, 0L, 0.0) + private val workloadLoader = ComputeWorkloadLoader(tracePath) /** * The [ForkJoinPool] that is used to execute the simulation jobs. |
